summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2008-07-11 02:38:35 +0000
committerZac Medico <zmedico@gentoo.org>2008-07-11 02:38:35 +0000
commitda8c62b6941d052ce5c197bad19a1b289f94a331 (patch)
tree0d503443fce30e08f2be94688faa7fdaedabf325
parent8bcd4a33e914f362cec4b7421c019a0969aa7276 (diff)
downloadportage-da8c62b6941d052ce5c197bad19a1b289f94a331.tar.gz
portage-da8c62b6941d052ce5c197bad19a1b289f94a331.tar.bz2
portage-da8c62b6941d052ce5c197bad19a1b289f94a331.zip
Refactor and simplify the main task scheduling and poll loops:
* Make output handlers unregister themselves and call wait() to notify exit listeners immediately. This makes the exit listeners more useful for scheduling tasks. This makes the poll loop nice an clean because it just calls the handlers and then the handlers can do the scheduling when necessary. * Make SequentialTaskQueue.add() and addFront() trigger scheduling internally, so that it's more of a chain reaction than something that has to be done explicitly. svn path=/main/trunk/; revision=11013
-rw-r--r--pym/_emerge/__init__.py168
1 files changed, 74 insertions, 94 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py
index 7f122bb4d..dce4ef8d8 100644
--- a/pym/_emerge/__init__.py
+++ b/pym/_emerge/__init__.py
@@ -1886,7 +1886,8 @@ class SpawnProcess(SubProcess):
f.flush()
f.close()
self.registered = False
- self._wait()
+ self.scheduler.unregister(self._reg_id)
+ self.wait()
return self.registered
def _dummy_handler(self, fd, event):
@@ -1908,7 +1909,8 @@ class SpawnProcess(SubProcess):
for f in files.values():
f.close()
self.registered = False
- self._wait()
+ self.scheduler.unregister(self._reg_id)
+ self.wait()
return self.registered
class EbuildFetcher(SpawnProcess):
@@ -2045,11 +2047,11 @@ class EbuildBuild(CompositeTask):
prefetcher.cancel()
elif prefetcher.poll() is None:
- waiting_msg = ("Fetching '%s' " + \
+ waiting_msg = "Fetching files " + \
"in the background. " + \
"To view fetch progress, run `tail -f " + \
"/var/log/emerge-fetch.log` in another " + \
- "terminal.") % prefetcher.pkg_path
+ "terminal."
msg_prefix = colorize("GOOD", " * ")
from textwrap import wrap
waiting_msg = "".join("%s%s\n" % (msg_prefix, line) \
@@ -2371,7 +2373,8 @@ class EbuildMetadataPhase(SubProcess):
for f in files.values():
f.close()
self.registered = False
- self._wait()
+ self.scheduler.unregister(self._reg_id)
+ self.wait()
if self.returncode == os.EX_OK:
metadata = izip(portage.auxdbkeys,
@@ -2519,7 +2522,8 @@ class EbuildPhase(SubProcess):
for f in files.values():
f.close()
self.registered = False
- self._wait()
+ self.scheduler.unregister(self._reg_id)
+ self.wait()
return self.registered
def _dummy_handler(self, fd, event):
@@ -2541,7 +2545,8 @@ class EbuildPhase(SubProcess):
for f in files.values():
f.close()
self.registered = False
- self._wait()
+ self.scheduler.unregister(self._reg_id)
+ self.wait()
return self.registered
def _set_returncode(self, wait_retval):
@@ -7720,9 +7725,11 @@ class SequentialTaskQueue(SlotObject):
def add(self, task):
self._task_queue.append(task)
+ self.schedule()
def addFront(self, task):
self._task_queue.appendleft(task)
+ self.schedule()
def schedule(self):
@@ -7814,12 +7821,29 @@ class PollLoop(object):
return True
+ def _poll_loop(self):
+
+ event_handlers = self._poll_event_handlers
+ poll = self._poll.poll
+ state_change = 0
+
+ while event_handlers:
+ for f, event in poll():
+ handler, reg_id = event_handlers[f]
+ if not handler(f, event):
+ state_change += 1
+
+ if not state_change:
+ raise AssertionError("tight loop")
+
def _register(self, f, eventmask, handler):
"""
@rtype: Integer
@return: A unique registration id, for use in schedule() or
unregister() calls.
"""
+ if f in self._poll_event_handlers:
+ raise AssertionError("fd %d is already registered" % f)
self._event_handler_id += 1
reg_id = self._event_handler_id
self._poll_event_handler_ids[reg_id] = f
@@ -7832,7 +7856,6 @@ class PollLoop(object):
self._poll.unregister(f)
del self._poll_event_handlers[f]
del self._poll_event_handler_ids[reg_id]
- self._schedule_tasks()
def _schedule(self, wait_id):
"""
@@ -7845,48 +7868,10 @@ class PollLoop(object):
handler_ids = self._poll_event_handler_ids
poll = self._poll.poll
- self._schedule_tasks()
-
while wait_id in handler_ids:
for f, event in poll():
handler, reg_id = event_handlers[f]
- if not handler(f, event):
- self._unregister(reg_id)
-
- def _schedule_tasks(self):
- return False
-
- def _schedule_main(self, wait=False):
-
- event_handlers = self._poll_event_handlers
- poll = self._poll.poll
- max_jobs = self._max_jobs
-
- state_change = 0
-
- if self._schedule_tasks():
- state_change += 1
-
- while event_handlers:
- jobs = self._jobs
-
- for f, event in poll():
- handler, reg_id = event_handlers[f]
- if not handler(f, event):
- state_change += 1
- self._unregister(reg_id)
-
- if jobs == self._jobs:
- continue
-
- if self._schedule_tasks():
- state_change += 1
-
- if not wait and self._jobs < max_jobs:
- break
-
- if not state_change:
- raise AssertionError("tight loop")
+ handler(f, event)
class Scheduler(PollLoop):
@@ -7904,7 +7889,7 @@ class Scheduler(PollLoop):
_fetch_log = "/var/log/emerge-fetch.log"
class _iface_class(SlotObject):
- __slots__ = ("fetch", "register", "schedule")
+ __slots__ = ("fetch", "register", "schedule", "unregister")
class _fetch_iface_class(SlotObject):
__slots__ = ("log_file", "schedule")
@@ -7966,7 +7951,7 @@ class Scheduler(PollLoop):
schedule=self._schedule_fetch)
self._sched_iface = self._iface_class(
fetch=fetch_iface, register=self._register,
- schedule=self._schedule)
+ schedule=self._schedule, unregister=self._unregister)
self._task_queues = self._task_queues_class()
for k in self._task_queues.allowed_keys:
@@ -8535,24 +8520,30 @@ class Scheduler(PollLoop):
if self._is_restart_scheduled():
self._set_max_jobs(1)
- pkg_queue = self._pkg_queue
- failed_pkgs = self._failed_pkgs
+ while not self._failed_pkgs and \
+ self._schedule_tasks():
+ self._poll_loop()
+
+ while self._jobs:
+ self._poll_loop()
+
+ def _schedule_tasks(self):
+ """
+ @rtype: bool
+ @returns: True if tasks remain to schedule, False otherwise.
+ """
+
task_queues = self._task_queues
- max_jobs = self._max_jobs
- max_load = self._max_load
- background = max_jobs > 1
+ background = self._max_jobs > 1
- while pkg_queue and not failed_pkgs:
+ while self._can_add_job():
- if not self._can_add_job():
- self._schedule_main()
- continue
+ if not self._pkg_queue:
+ return False
pkg = self._choose_pkg()
-
if pkg is None:
- self._schedule_main()
- continue
+ return True
if not pkg.installed:
self._pkg_count.curval += 1
@@ -8570,16 +8561,7 @@ class Scheduler(PollLoop):
else:
task.addExitListener(self._build_exit)
task_queues.jobs.add(task)
-
- while self._jobs:
- self._schedule_main(wait=True)
-
- def _schedule_tasks(self):
- state_change = 0
- for x in self._task_queues.values():
- if x.schedule():
- state_change += 1
- return bool(state_change)
+ return True
def _task(self, pkg, background):
@@ -8747,7 +8729,7 @@ class Scheduler(PollLoop):
class MetadataRegen(PollLoop):
class _sched_iface_class(SlotObject):
- __slots__ = ("register", "schedule")
+ __slots__ = ("register", "schedule", "unregister")
def __init__(self, portdb, max_jobs=None, max_load=None):
PollLoop.__init__(self)
@@ -8756,14 +8738,15 @@ class MetadataRegen(PollLoop):
if max_jobs is None:
max_jobs = 1
- self._job_queue = SequentialTaskQueue(max_jobs=max_jobs)
self._max_jobs = max_jobs
self._max_load = max_load
self._sched_iface = self._sched_iface_class(
register=self._register,
- schedule=self._schedule)
+ schedule=self._schedule,
+ unregister=self._unregister)
self._valid_pkgs = set()
+ self._process_iter = self._iter_metadata_processes()
def _iter_metadata_processes(self):
portdb = self._portdb
@@ -8800,7 +8783,11 @@ class MetadataRegen(PollLoop):
dead_nodes = None
break
- self._main_loop()
+ while self._schedule_tasks():
+ self._poll_loop()
+
+ while self._jobs:
+ self._poll_loop()
if dead_nodes:
for y in self._valid_pkgs:
@@ -8816,31 +8803,23 @@ class MetadataRegen(PollLoop):
except (KeyError, CacheError):
pass
- def _main_loop(self):
-
- process_iter = self._iter_metadata_processes()
-
- while True:
-
- if not self._can_add_job():
- self._schedule_main()
- continue
-
+ def _schedule_tasks(self):
+ """
+ @rtype: bool
+ @returns: True if there may be remaining tasks to schedule,
+ False otherwise.
+ """
+ while self._can_add_job():
try:
- metadata_process = process_iter.next()
+ metadata_process = self._process_iter.next()
except StopIteration:
- break
+ return False
self._jobs += 1
metadata_process.scheduler = self._sched_iface
metadata_process.addExitListener(self._metadata_exit)
- self._job_queue.add(metadata_process)
-
- while self._jobs:
- self._schedule_main(wait=True)
-
- def _schedule_tasks(self):
- return self._job_queue.schedule()
+ metadata_process.start()
+ return True
def _metadata_exit(self, metadata_process):
self._jobs -= 1
@@ -8848,6 +8827,7 @@ class MetadataRegen(PollLoop):
self._valid_pkgs.discard(metadata_process.cpv)
portage.writemsg("Error processing %s, continuing...\n" % \
(metadata_process.cpv,))
+ self._schedule_tasks()
class UninstallFailure(portage.exception.PortageException):
"""