summaryrefslogtreecommitdiffstats
path: root/pym/_emerge/PollScheduler.py
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2009-06-22 18:47:26 +0000
committerZac Medico <zmedico@gentoo.org>2009-06-22 18:47:26 +0000
commitbf9282b6782ad433b2ca905a5131bd0c424a2d94 (patch)
tree07e4481a6a7bef331e8f463b3a7ce1b40fe5e4d0 /pym/_emerge/PollScheduler.py
parent69500d28402817fb70c104501b5c8f4f54650f4a (diff)
downloadportage-bf9282b6782ad433b2ca905a5131bd0c424a2d94.tar.gz
portage-bf9282b6782ad433b2ca905a5131bd0c424a2d94.tar.bz2
portage-bf9282b6782ad433b2ca905a5131bd0c424a2d94.zip
Bug #275047 - Split _emerge/__init__.py into smaller pieces (part 3).
Thanks to Sebastian Mingramm (few) <s.mingramm@gmx.de> for this patch. svn path=/main/trunk/; revision=13668
Diffstat (limited to 'pym/_emerge/PollScheduler.py')
-rw-r--r--pym/_emerge/PollScheduler.py251
1 files changed, 251 insertions, 0 deletions
diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
new file mode 100644
index 000000000..3a23877f9
--- /dev/null
+++ b/pym/_emerge/PollScheduler.py
@@ -0,0 +1,251 @@
+import logging
+import select
+
+from portage.util import writemsg_level
+
+from _emerge.SlotObject import SlotObject
+from _emerge.getloadavg import getloadavg
+from _emerge.PollConstants import PollConstants
+from _emerge.PollSelectAdapter import PollSelectAdapter
+
+class PollScheduler(object):
+
+ class _sched_iface_class(SlotObject):
+ __slots__ = ("register", "schedule", "unregister")
+
+ def __init__(self):
+ self._max_jobs = 1
+ self._max_load = None
+ self._jobs = 0
+ self._poll_event_queue = []
+ self._poll_event_handlers = {}
+ self._poll_event_handler_ids = {}
+ # Increment id for each new handler.
+ self._event_handler_id = 0
+ self._poll_obj = create_poll_instance()
+ self._scheduling = False
+
+ def _schedule(self):
+ """
+ Calls _schedule_tasks() and automatically returns early from
+ any recursive calls to this method that the _schedule_tasks()
+ call might trigger. This makes _schedule() safe to call from
+ inside exit listeners.
+ """
+ if self._scheduling:
+ return False
+ self._scheduling = True
+ try:
+ return self._schedule_tasks()
+ finally:
+ self._scheduling = False
+
+ def _running_job_count(self):
+ return self._jobs
+
+ def _can_add_job(self):
+ max_jobs = self._max_jobs
+ max_load = self._max_load
+
+ if self._max_jobs is not True and \
+ self._running_job_count() >= self._max_jobs:
+ return False
+
+ if max_load is not None and \
+ (max_jobs is True or max_jobs > 1) and \
+ self._running_job_count() >= 1:
+ try:
+ avg1, avg5, avg15 = getloadavg()
+ except OSError:
+ return False
+
+ if avg1 >= max_load:
+ return False
+
+ return True
+
+ def _poll(self, timeout=None):
+ """
+ All poll() calls pass through here. The poll events
+ are added directly to self._poll_event_queue.
+ In order to avoid endless blocking, this raises
+ StopIteration if timeout is None and there are
+ no file descriptors to poll.
+ """
+ if not self._poll_event_handlers:
+ self._schedule()
+ if timeout is None and \
+ not self._poll_event_handlers:
+ raise StopIteration(
+ "timeout is None and there are no poll() event handlers")
+
+ # The following error is known to occur with Linux kernel versions
+ # less than 2.6.24:
+ #
+ # select.error: (4, 'Interrupted system call')
+ #
+ # This error has been observed after a SIGSTOP, followed by SIGCONT.
+ # Treat it similar to EAGAIN if timeout is None, otherwise just return
+ # without any events.
+ while True:
+ try:
+ self._poll_event_queue.extend(self._poll_obj.poll(timeout))
+ break
+ except select.error, e:
+ writemsg_level("\n!!! select error: %s\n" % (e,),
+ level=logging.ERROR, noiselevel=-1)
+ del e
+ if timeout is not None:
+ break
+
+ def _next_poll_event(self, timeout=None):
+ """
+ Since the _schedule_wait() loop is called by event
+ handlers from _poll_loop(), maintain a central event
+ queue for both of them to share events from a single
+ poll() call. In order to avoid endless blocking, this
+ raises StopIteration if timeout is None and there are
+ no file descriptors to poll.
+ """
+ if not self._poll_event_queue:
+ self._poll(timeout)
+ return self._poll_event_queue.pop()
+
+ def _poll_loop(self):
+
+ event_handlers = self._poll_event_handlers
+ event_handled = False
+
+ try:
+ while event_handlers:
+ f, event = self._next_poll_event()
+ handler, reg_id = event_handlers[f]
+ handler(f, event)
+ event_handled = True
+ except StopIteration:
+ event_handled = True
+
+ if not event_handled:
+ raise AssertionError("tight loop")
+
+ def _schedule_yield(self):
+ """
+ Schedule for a short period of time chosen by the scheduler based
+ on internal state. Synchronous tasks should call this periodically
+ in order to allow the scheduler to service pending poll events. The
+ scheduler will call poll() exactly once, without blocking, and any
+ resulting poll events will be serviced.
+ """
+ event_handlers = self._poll_event_handlers
+ events_handled = 0
+
+ if not event_handlers:
+ return bool(events_handled)
+
+ if not self._poll_event_queue:
+ self._poll(0)
+
+ try:
+ while event_handlers and self._poll_event_queue:
+ f, event = self._next_poll_event()
+ handler, reg_id = event_handlers[f]
+ handler(f, event)
+ events_handled += 1
+ except StopIteration:
+ events_handled += 1
+
+ return bool(events_handled)
+
+ def _register(self, f, eventmask, handler):
+ """
+ @rtype: Integer
+ @return: A unique registration id, for use in schedule() or
+ unregister() calls.
+ """
+ if f in self._poll_event_handlers:
+ raise AssertionError("fd %d is already registered" % f)
+ self._event_handler_id += 1
+ reg_id = self._event_handler_id
+ self._poll_event_handler_ids[reg_id] = f
+ self._poll_event_handlers[f] = (handler, reg_id)
+ self._poll_obj.register(f, eventmask)
+ return reg_id
+
+ def _unregister(self, reg_id):
+ f = self._poll_event_handler_ids[reg_id]
+ self._poll_obj.unregister(f)
+ del self._poll_event_handlers[f]
+ del self._poll_event_handler_ids[reg_id]
+
+ def _schedule_wait(self, wait_ids):
+ """
+ Schedule until wait_id is not longer registered
+ for poll() events.
+ @type wait_id: int
+ @param wait_id: a task id to wait for
+ """
+ event_handlers = self._poll_event_handlers
+ handler_ids = self._poll_event_handler_ids
+ event_handled = False
+
+ if isinstance(wait_ids, int):
+ wait_ids = frozenset([wait_ids])
+
+ try:
+ while wait_ids.intersection(handler_ids):
+ f, event = self._next_poll_event()
+ handler, reg_id = event_handlers[f]
+ handler(f, event)
+ event_handled = True
+ except StopIteration:
+ event_handled = True
+
+ return event_handled
+
+
+_can_poll_device = None
+
+def can_poll_device():
+ """
+ Test if it's possible to use poll() on a device such as a pty. This
+ is known to fail on Darwin.
+ @rtype: bool
+ @returns: True if poll() on a device succeeds, False otherwise.
+ """
+
+ global _can_poll_device
+ if _can_poll_device is not None:
+ return _can_poll_device
+
+ if not hasattr(select, "poll"):
+ _can_poll_device = False
+ return _can_poll_device
+
+ try:
+ dev_null = open('/dev/null', 'rb')
+ except IOError:
+ _can_poll_device = False
+ return _can_poll_device
+
+ p = select.poll()
+ p.register(dev_null.fileno(), PollConstants.POLLIN)
+
+ invalid_request = False
+ for f, event in p.poll():
+ if event & PollConstants.POLLNVAL:
+ invalid_request = True
+ break
+ dev_null.close()
+
+ _can_poll_device = not invalid_request
+ return _can_poll_device
+
+def create_poll_instance():
+ """
+ Create an instance of select.poll, or an instance of
+ PollSelectAdapter there is no poll() implementation or
+ it is broken somehow.
+ """
+ if can_poll_device():
+ return select.poll()
+ return PollSelectAdapter()