diff options
author | Zac Medico <zmedico@gentoo.org> | 2009-06-22 18:47:26 +0000 |
---|---|---|
committer | Zac Medico <zmedico@gentoo.org> | 2009-06-22 18:47:26 +0000 |
commit | bf9282b6782ad433b2ca905a5131bd0c424a2d94 (patch) | |
tree | 07e4481a6a7bef331e8f463b3a7ce1b40fe5e4d0 /pym/_emerge/PollScheduler.py | |
parent | 69500d28402817fb70c104501b5c8f4f54650f4a (diff) | |
download | portage-bf9282b6782ad433b2ca905a5131bd0c424a2d94.tar.gz portage-bf9282b6782ad433b2ca905a5131bd0c424a2d94.tar.bz2 portage-bf9282b6782ad433b2ca905a5131bd0c424a2d94.zip |
Bug #275047 - Split _emerge/__init__.py into smaller pieces (part 3).
Thanks to Sebastian Mingramm (few) <s.mingramm@gmx.de> for this patch.
svn path=/main/trunk/; revision=13668
Diffstat (limited to 'pym/_emerge/PollScheduler.py')
-rw-r--r-- | pym/_emerge/PollScheduler.py | 251 |
1 files changed, 251 insertions, 0 deletions
diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py new file mode 100644 index 000000000..3a23877f9 --- /dev/null +++ b/pym/_emerge/PollScheduler.py @@ -0,0 +1,251 @@ +import logging +import select + +from portage.util import writemsg_level + +from _emerge.SlotObject import SlotObject +from _emerge.getloadavg import getloadavg +from _emerge.PollConstants import PollConstants +from _emerge.PollSelectAdapter import PollSelectAdapter + +class PollScheduler(object): + + class _sched_iface_class(SlotObject): + __slots__ = ("register", "schedule", "unregister") + + def __init__(self): + self._max_jobs = 1 + self._max_load = None + self._jobs = 0 + self._poll_event_queue = [] + self._poll_event_handlers = {} + self._poll_event_handler_ids = {} + # Increment id for each new handler. + self._event_handler_id = 0 + self._poll_obj = create_poll_instance() + self._scheduling = False + + def _schedule(self): + """ + Calls _schedule_tasks() and automatically returns early from + any recursive calls to this method that the _schedule_tasks() + call might trigger. This makes _schedule() safe to call from + inside exit listeners. + """ + if self._scheduling: + return False + self._scheduling = True + try: + return self._schedule_tasks() + finally: + self._scheduling = False + + def _running_job_count(self): + return self._jobs + + def _can_add_job(self): + max_jobs = self._max_jobs + max_load = self._max_load + + if self._max_jobs is not True and \ + self._running_job_count() >= self._max_jobs: + return False + + if max_load is not None and \ + (max_jobs is True or max_jobs > 1) and \ + self._running_job_count() >= 1: + try: + avg1, avg5, avg15 = getloadavg() + except OSError: + return False + + if avg1 >= max_load: + return False + + return True + + def _poll(self, timeout=None): + """ + All poll() calls pass through here. The poll events + are added directly to self._poll_event_queue. + In order to avoid endless blocking, this raises + StopIteration if timeout is None and there are + no file descriptors to poll. + """ + if not self._poll_event_handlers: + self._schedule() + if timeout is None and \ + not self._poll_event_handlers: + raise StopIteration( + "timeout is None and there are no poll() event handlers") + + # The following error is known to occur with Linux kernel versions + # less than 2.6.24: + # + # select.error: (4, 'Interrupted system call') + # + # This error has been observed after a SIGSTOP, followed by SIGCONT. + # Treat it similar to EAGAIN if timeout is None, otherwise just return + # without any events. + while True: + try: + self._poll_event_queue.extend(self._poll_obj.poll(timeout)) + break + except select.error, e: + writemsg_level("\n!!! select error: %s\n" % (e,), + level=logging.ERROR, noiselevel=-1) + del e + if timeout is not None: + break + + def _next_poll_event(self, timeout=None): + """ + Since the _schedule_wait() loop is called by event + handlers from _poll_loop(), maintain a central event + queue for both of them to share events from a single + poll() call. In order to avoid endless blocking, this + raises StopIteration if timeout is None and there are + no file descriptors to poll. + """ + if not self._poll_event_queue: + self._poll(timeout) + return self._poll_event_queue.pop() + + def _poll_loop(self): + + event_handlers = self._poll_event_handlers + event_handled = False + + try: + while event_handlers: + f, event = self._next_poll_event() + handler, reg_id = event_handlers[f] + handler(f, event) + event_handled = True + except StopIteration: + event_handled = True + + if not event_handled: + raise AssertionError("tight loop") + + def _schedule_yield(self): + """ + Schedule for a short period of time chosen by the scheduler based + on internal state. Synchronous tasks should call this periodically + in order to allow the scheduler to service pending poll events. The + scheduler will call poll() exactly once, without blocking, and any + resulting poll events will be serviced. + """ + event_handlers = self._poll_event_handlers + events_handled = 0 + + if not event_handlers: + return bool(events_handled) + + if not self._poll_event_queue: + self._poll(0) + + try: + while event_handlers and self._poll_event_queue: + f, event = self._next_poll_event() + handler, reg_id = event_handlers[f] + handler(f, event) + events_handled += 1 + except StopIteration: + events_handled += 1 + + return bool(events_handled) + + def _register(self, f, eventmask, handler): + """ + @rtype: Integer + @return: A unique registration id, for use in schedule() or + unregister() calls. + """ + if f in self._poll_event_handlers: + raise AssertionError("fd %d is already registered" % f) + self._event_handler_id += 1 + reg_id = self._event_handler_id + self._poll_event_handler_ids[reg_id] = f + self._poll_event_handlers[f] = (handler, reg_id) + self._poll_obj.register(f, eventmask) + return reg_id + + def _unregister(self, reg_id): + f = self._poll_event_handler_ids[reg_id] + self._poll_obj.unregister(f) + del self._poll_event_handlers[f] + del self._poll_event_handler_ids[reg_id] + + def _schedule_wait(self, wait_ids): + """ + Schedule until wait_id is not longer registered + for poll() events. + @type wait_id: int + @param wait_id: a task id to wait for + """ + event_handlers = self._poll_event_handlers + handler_ids = self._poll_event_handler_ids + event_handled = False + + if isinstance(wait_ids, int): + wait_ids = frozenset([wait_ids]) + + try: + while wait_ids.intersection(handler_ids): + f, event = self._next_poll_event() + handler, reg_id = event_handlers[f] + handler(f, event) + event_handled = True + except StopIteration: + event_handled = True + + return event_handled + + +_can_poll_device = None + +def can_poll_device(): + """ + Test if it's possible to use poll() on a device such as a pty. This + is known to fail on Darwin. + @rtype: bool + @returns: True if poll() on a device succeeds, False otherwise. + """ + + global _can_poll_device + if _can_poll_device is not None: + return _can_poll_device + + if not hasattr(select, "poll"): + _can_poll_device = False + return _can_poll_device + + try: + dev_null = open('/dev/null', 'rb') + except IOError: + _can_poll_device = False + return _can_poll_device + + p = select.poll() + p.register(dev_null.fileno(), PollConstants.POLLIN) + + invalid_request = False + for f, event in p.poll(): + if event & PollConstants.POLLNVAL: + invalid_request = True + break + dev_null.close() + + _can_poll_device = not invalid_request + return _can_poll_device + +def create_poll_instance(): + """ + Create an instance of select.poll, or an instance of + PollSelectAdapter there is no poll() implementation or + it is broken somehow. + """ + if can_poll_device(): + return select.poll() + return PollSelectAdapter() |