summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2008-07-17 22:41:04 +0000
committerZac Medico <zmedico@gentoo.org>2008-07-17 22:41:04 +0000
commitf202826c41db708712a1847c11f2298b1314b2d9 (patch)
tree088fbf35bdc00d18365b610d9c5567ca43361808
parent37ffbba1a6f77d177bb6535ddc6727703f12808a (diff)
downloadportage-f202826c41db708712a1847c11f2298b1314b2d9.tar.gz
portage-f202826c41db708712a1847c11f2298b1314b2d9.tar.bz2
portage-f202826c41db708712a1847c11f2298b1314b2d9.zip
When dblink is looping over files for merge/unmerge, temporarily yield to the
scheduler each time a fixed number of files are processed (currently 20). This gives the scheduler an opportunity to service pending poll events. This is implemented with a new PollScheduler._schedule_yield() method which calls poll() exactly once, without blocking, and any services any resulting poll events. svn path=/main/trunk/; revision=11116
-rw-r--r--pym/_emerge/__init__.py48
-rw-r--r--pym/portage/dbapi/vartree.py39
2 files changed, 76 insertions, 11 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py
index 94218e25a..b6e942963 100644
--- a/pym/_emerge/__init__.py
+++ b/pym/_emerge/__init__.py
@@ -7958,7 +7958,19 @@ class PollSelectAdapter(PollConstants):
if timeout is not None:
select_args = select_args[:]
- select_args.append(timeout)
+ # Translate poll() timeout args to select() timeout args:
+ #
+ # | units | value(s) for indefinite block
+ # ---------|--------------|------------------------------
+ # poll | milliseconds | omitted, negative, or None
+ # ---------|--------------|------------------------------
+ # select | seconds | omitted
+ # ---------|--------------|------------------------------
+
+ if timeout is not None and timeout < 0:
+ timeout = None
+ if timeout is not None:
+ select_args.append(timeout / 1000)
select_events = select.select(*select_args)
poll_events = []
@@ -8145,7 +8157,7 @@ class PollScheduler(object):
return True
- def _next_poll_event(self):
+ def _next_poll_event(self, timeout=None):
"""
Since the _schedule_wait() loop is called by event
handlers from _poll_loop(), maintain a central event
@@ -8153,7 +8165,7 @@ class PollScheduler(object):
poll() call.
"""
if not self._poll_event_queue:
- self._poll_event_queue.extend(self._poll.poll())
+ self._poll_event_queue.extend(self._poll.poll(timeout))
return self._poll_event_queue.pop()
def _poll_loop(self):
@@ -8170,6 +8182,31 @@ class PollScheduler(object):
if not event_handled:
raise AssertionError("tight loop")
+ def _schedule_yield(self):
+ """
+ Schedule for a short period of time chosen by the scheduler based
+ on internal state. Synchronous tasks should call this periodically
+ in order to allow the scheduler to service pending poll events. The
+ scheduler will call poll() exactly once, without blocking, and any
+ resulting poll events will be serviced.
+ """
+ event_handlers = self._poll_event_handlers
+ events_handled = 0
+
+ if not event_handlers:
+ return bool(events_handled)
+
+ if not self._poll_event_queue:
+ self._poll_event_queue.extend(self._poll.poll(0))
+
+ while event_handlers and self._poll_event_queue:
+ f, event = self._next_poll_event()
+ handler, reg_id = event_handlers[f]
+ handler(f, event)
+ events_handled += 1
+
+ return bool(events_handled)
+
def _register(self, f, eventmask, handler):
"""
@rtype: Integer
@@ -8429,7 +8466,7 @@ class Scheduler(PollScheduler):
class _iface_class(SlotObject):
__slots__ = ("dblinkEbuildPhase", "dblinkDisplayMerge", "fetch",
- "register", "schedule", "unregister")
+ "register", "schedule", "scheduleYield", "unregister")
class _fetch_iface_class(SlotObject):
__slots__ = ("log_file", "schedule")
@@ -8498,7 +8535,8 @@ class Scheduler(PollScheduler):
dblinkEbuildPhase=self._dblink_ebuild_phase,
dblinkDisplayMerge=self._dblink_display_merge,
fetch=fetch_iface, register=self._register,
- schedule=self._schedule_wait, unregister=self._unregister)
+ schedule=self._schedule_wait, scheduleYield=self._schedule_yield,
+ unregister=self._unregister)
self._task_queues = self._task_queues_class()
for k in self._task_queues.allowed_keys:
diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py
index 32d83e25a..a6cebaab4 100644
--- a/pym/portage/dbapi/vartree.py
+++ b/pym/portage/dbapi/vartree.py
@@ -1283,6 +1283,10 @@ class dblink(object):
"sym": 5
}
+ # When looping over files for merge/unmerge, temporarily yield to the
+ # scheduler each time this many files are processed.
+ _file_merge_yield_interval = 20
+
def __init__(self, cat, pkg, myroot, mysettings, treetype=None,
vartree=None, blockers=None, scheduler=None):
"""
@@ -1775,6 +1779,7 @@ class dblink(object):
"""
showMessage = self._display_merge
+ scheduler = self._scheduler
if not pkgfiles:
showMessage("No package files given... Grabbing a set.\n")
@@ -1843,7 +1848,12 @@ class dblink(object):
def show_unmerge(zing, desc, file_type, file_name):
showMessage("%s %s %s %s\n" % \
(zing, desc.ljust(8), file_type, file_name))
- for objkey in mykeys:
+ for i, objkey in enumerate(mykeys):
+
+ if scheduler is not None and \
+ 0 == i % self._file_merge_yield_interval:
+ scheduler.scheduleYield()
+
obj = normalize_path(objkey)
file_data = pkgfiles[objkey]
file_type = file_data[0]
@@ -2217,17 +2227,21 @@ class dblink(object):
self.settings.get("COLLISION_IGNORE", "").split()])
showMessage = self._display_merge
+ scheduler = self._scheduler
stopmerge = False
- i=0
collisions = []
destroot = normalize_path(destroot).rstrip(os.path.sep) + \
os.path.sep
showMessage("%s checking %d files for package collisions\n" % \
(green("*"), len(mycontents)))
- for f in mycontents:
- i = i + 1
+ for i, f in enumerate(mycontents):
if i % 1000 == 0:
showMessage("%d files checked ...\n" % i)
+
+ if scheduler is not None and \
+ 0 == i % self._file_merge_yield_interval:
+ scheduler.scheduleYield()
+
dest_path = normalize_path(
os.path.join(destroot, f.lstrip(os.path.sep)))
try:
@@ -2287,13 +2301,19 @@ class dblink(object):
return 0
showMessage = self._display_merge
+ scheduler = self._scheduler
file_paths = set()
for dblnk in installed_instances:
file_paths.update(dblnk.getcontents())
inode_map = {}
real_paths = set()
- for path in file_paths:
+ for i, path in enumerate(file_paths):
+
+ if scheduler is not None and \
+ 0 == i % self._file_merge_yield_interval:
+ scheduler.scheduleYield()
+
try:
s = os.lstat(path)
except OSError, e:
@@ -2842,6 +2862,7 @@ class dblink(object):
"""
showMessage = self._display_merge
+ scheduler = self._scheduler
from os.path import sep, join
srcroot = normalize_path(srcroot).rstrip(sep) + sep
@@ -2855,7 +2876,13 @@ class dblink(object):
else:
mergelist = stufftomerge
offset = ""
- for x in mergelist:
+
+ for i, x in enumerate(mergelist):
+
+ if scheduler is not None and \
+ 0 == i % self._file_merge_yield_interval:
+ scheduler.scheduleYield()
+
mysrc = join(srcroot, offset, x)
mydest = join(destroot, offset, x)
# myrealdest is mydest without the $ROOT prefix (makes a difference if ROOT!="/")