From f202826c41db708712a1847c11f2298b1314b2d9 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Thu, 17 Jul 2008 22:41:04 +0000 Subject: When dblink is looping over files for merge/unmerge, temporarily yield to the scheduler each time a fixed number of files are processed (currently 20). This gives the scheduler an opportunity to service pending poll events. This is implemented with a new PollScheduler._schedule_yield() method which calls poll() exactly once, without blocking, and any services any resulting poll events. svn path=/main/trunk/; revision=11116 --- pym/_emerge/__init__.py | 48 +++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 43 insertions(+), 5 deletions(-) (limited to 'pym/_emerge') diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index 94218e25a..b6e942963 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -7958,7 +7958,19 @@ class PollSelectAdapter(PollConstants): if timeout is not None: select_args = select_args[:] - select_args.append(timeout) + # Translate poll() timeout args to select() timeout args: + # + # | units | value(s) for indefinite block + # ---------|--------------|------------------------------ + # poll | milliseconds | omitted, negative, or None + # ---------|--------------|------------------------------ + # select | seconds | omitted + # ---------|--------------|------------------------------ + + if timeout is not None and timeout < 0: + timeout = None + if timeout is not None: + select_args.append(timeout / 1000) select_events = select.select(*select_args) poll_events = [] @@ -8145,7 +8157,7 @@ class PollScheduler(object): return True - def _next_poll_event(self): + def _next_poll_event(self, timeout=None): """ Since the _schedule_wait() loop is called by event handlers from _poll_loop(), maintain a central event @@ -8153,7 +8165,7 @@ class PollScheduler(object): poll() call. """ if not self._poll_event_queue: - self._poll_event_queue.extend(self._poll.poll()) + self._poll_event_queue.extend(self._poll.poll(timeout)) return self._poll_event_queue.pop() def _poll_loop(self): @@ -8170,6 +8182,31 @@ class PollScheduler(object): if not event_handled: raise AssertionError("tight loop") + def _schedule_yield(self): + """ + Schedule for a short period of time chosen by the scheduler based + on internal state. Synchronous tasks should call this periodically + in order to allow the scheduler to service pending poll events. The + scheduler will call poll() exactly once, without blocking, and any + resulting poll events will be serviced. + """ + event_handlers = self._poll_event_handlers + events_handled = 0 + + if not event_handlers: + return bool(events_handled) + + if not self._poll_event_queue: + self._poll_event_queue.extend(self._poll.poll(0)) + + while event_handlers and self._poll_event_queue: + f, event = self._next_poll_event() + handler, reg_id = event_handlers[f] + handler(f, event) + events_handled += 1 + + return bool(events_handled) + def _register(self, f, eventmask, handler): """ @rtype: Integer @@ -8429,7 +8466,7 @@ class Scheduler(PollScheduler): class _iface_class(SlotObject): __slots__ = ("dblinkEbuildPhase", "dblinkDisplayMerge", "fetch", - "register", "schedule", "unregister") + "register", "schedule", "scheduleYield", "unregister") class _fetch_iface_class(SlotObject): __slots__ = ("log_file", "schedule") @@ -8498,7 +8535,8 @@ class Scheduler(PollScheduler): dblinkEbuildPhase=self._dblink_ebuild_phase, dblinkDisplayMerge=self._dblink_display_merge, fetch=fetch_iface, register=self._register, - schedule=self._schedule_wait, unregister=self._unregister) + schedule=self._schedule_wait, scheduleYield=self._schedule_yield, + unregister=self._unregister) self._task_queues = self._task_queues_class() for k in self._task_queues.allowed_keys: -- cgit v1.2.3-1-g7c22