summaryrefslogtreecommitdiffstats
path: root/pym/_emerge/__init__.py
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2009-06-22 18:47:26 +0000
committerZac Medico <zmedico@gentoo.org>2009-06-22 18:47:26 +0000
commitbf9282b6782ad433b2ca905a5131bd0c424a2d94 (patch)
tree07e4481a6a7bef331e8f463b3a7ce1b40fe5e4d0 /pym/_emerge/__init__.py
parent69500d28402817fb70c104501b5c8f4f54650f4a (diff)
downloadportage-bf9282b6782ad433b2ca905a5131bd0c424a2d94.tar.gz
portage-bf9282b6782ad433b2ca905a5131bd0c424a2d94.tar.bz2
portage-bf9282b6782ad433b2ca905a5131bd0c424a2d94.zip
Bug #275047 - Split _emerge/__init__.py into smaller pieces (part 3).
Thanks to Sebastian Mingramm (few) <s.mingramm@gmx.de> for this patch. svn path=/main/trunk/; revision=13668
Diffstat (limited to 'pym/_emerge/__init__.py')
-rw-r--r--pym/_emerge/__init__.py341
1 files changed, 1 insertions, 340 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py
index 3a3dac25e..de5642da9 100644
--- a/pym/_emerge/__init__.py
+++ b/pym/_emerge/__init__.py
@@ -3,10 +3,8 @@
# Distributed under the terms of the GNU General Public License v2
# $Id$
-import formatter
import logging
import pwd
-import select
import shlex
import signal
import sys
@@ -61,7 +59,6 @@ from _emerge.DepPriorityNormalRange import DepPriorityNormalRange
from _emerge.DepPrioritySatisfiedRange import DepPrioritySatisfiedRange
from _emerge.Task import Task
from _emerge.Blocker import Blocker
-from _emerge.PollConstants import PollConstants
from _emerge.AsynchronousTask import AsynchronousTask
from _emerge.CompositeTask import CompositeTask
from _emerge.EbuildFetcher import EbuildFetcher
@@ -80,13 +77,12 @@ from _emerge.BlockerCache import BlockerCache
from _emerge.PackageVirtualDbapi import PackageVirtualDbapi
from _emerge.RepoDisplay import RepoDisplay
from _emerge.UseFlagDisplay import UseFlagDisplay
-from _emerge.PollSelectAdapter import PollSelectAdapter
from _emerge.SequentialTaskQueue import SequentialTaskQueue
from _emerge.ProgressHandler import ProgressHandler
from _emerge.stdout_spinner import stdout_spinner
from _emerge.UninstallFailure import UninstallFailure
from _emerge.JobStatusDisplay import JobStatusDisplay
-from _emerge.getloadavg import getloadavg
+from _emerge.PollScheduler import PollScheduler
def userquery(prompt, responses=None, colours=None):
"""Displays a prompt and a set of responses, then waits for a response
@@ -6445,341 +6441,6 @@ class PackageCounters(object):
(self.blocks - self.blocks_satisfied))
return "".join(myoutput)
-
-_can_poll_device = None
-
-def can_poll_device():
- """
- Test if it's possible to use poll() on a device such as a pty. This
- is known to fail on Darwin.
- @rtype: bool
- @returns: True if poll() on a device succeeds, False otherwise.
- """
-
- global _can_poll_device
- if _can_poll_device is not None:
- return _can_poll_device
-
- if not hasattr(select, "poll"):
- _can_poll_device = False
- return _can_poll_device
-
- try:
- dev_null = open('/dev/null', 'rb')
- except IOError:
- _can_poll_device = False
- return _can_poll_device
-
- p = select.poll()
- p.register(dev_null.fileno(), PollConstants.POLLIN)
-
- invalid_request = False
- for f, event in p.poll():
- if event & PollConstants.POLLNVAL:
- invalid_request = True
- break
- dev_null.close()
-
- _can_poll_device = not invalid_request
- return _can_poll_device
-
-def create_poll_instance():
- """
- Create an instance of select.poll, or an instance of
- PollSelectAdapter there is no poll() implementation or
- it is broken somehow.
- """
- if can_poll_device():
- return select.poll()
- return PollSelectAdapter()
-
-class PollScheduler(object):
-
- class _sched_iface_class(SlotObject):
- __slots__ = ("register", "schedule", "unregister")
-
- def __init__(self):
- self._max_jobs = 1
- self._max_load = None
- self._jobs = 0
- self._poll_event_queue = []
- self._poll_event_handlers = {}
- self._poll_event_handler_ids = {}
- # Increment id for each new handler.
- self._event_handler_id = 0
- self._poll_obj = create_poll_instance()
- self._scheduling = False
-
- def _schedule(self):
- """
- Calls _schedule_tasks() and automatically returns early from
- any recursive calls to this method that the _schedule_tasks()
- call might trigger. This makes _schedule() safe to call from
- inside exit listeners.
- """
- if self._scheduling:
- return False
- self._scheduling = True
- try:
- return self._schedule_tasks()
- finally:
- self._scheduling = False
-
- def _running_job_count(self):
- return self._jobs
-
- def _can_add_job(self):
- max_jobs = self._max_jobs
- max_load = self._max_load
-
- if self._max_jobs is not True and \
- self._running_job_count() >= self._max_jobs:
- return False
-
- if max_load is not None and \
- (max_jobs is True or max_jobs > 1) and \
- self._running_job_count() >= 1:
- try:
- avg1, avg5, avg15 = getloadavg()
- except OSError:
- return False
-
- if avg1 >= max_load:
- return False
-
- return True
-
- def _poll(self, timeout=None):
- """
- All poll() calls pass through here. The poll events
- are added directly to self._poll_event_queue.
- In order to avoid endless blocking, this raises
- StopIteration if timeout is None and there are
- no file descriptors to poll.
- """
- if not self._poll_event_handlers:
- self._schedule()
- if timeout is None and \
- not self._poll_event_handlers:
- raise StopIteration(
- "timeout is None and there are no poll() event handlers")
-
- # The following error is known to occur with Linux kernel versions
- # less than 2.6.24:
- #
- # select.error: (4, 'Interrupted system call')
- #
- # This error has been observed after a SIGSTOP, followed by SIGCONT.
- # Treat it similar to EAGAIN if timeout is None, otherwise just return
- # without any events.
- while True:
- try:
- self._poll_event_queue.extend(self._poll_obj.poll(timeout))
- break
- except select.error, e:
- writemsg_level("\n!!! select error: %s\n" % (e,),
- level=logging.ERROR, noiselevel=-1)
- del e
- if timeout is not None:
- break
-
- def _next_poll_event(self, timeout=None):
- """
- Since the _schedule_wait() loop is called by event
- handlers from _poll_loop(), maintain a central event
- queue for both of them to share events from a single
- poll() call. In order to avoid endless blocking, this
- raises StopIteration if timeout is None and there are
- no file descriptors to poll.
- """
- if not self._poll_event_queue:
- self._poll(timeout)
- return self._poll_event_queue.pop()
-
- def _poll_loop(self):
-
- event_handlers = self._poll_event_handlers
- event_handled = False
-
- try:
- while event_handlers:
- f, event = self._next_poll_event()
- handler, reg_id = event_handlers[f]
- handler(f, event)
- event_handled = True
- except StopIteration:
- event_handled = True
-
- if not event_handled:
- raise AssertionError("tight loop")
-
- def _schedule_yield(self):
- """
- Schedule for a short period of time chosen by the scheduler based
- on internal state. Synchronous tasks should call this periodically
- in order to allow the scheduler to service pending poll events. The
- scheduler will call poll() exactly once, without blocking, and any
- resulting poll events will be serviced.
- """
- event_handlers = self._poll_event_handlers
- events_handled = 0
-
- if not event_handlers:
- return bool(events_handled)
-
- if not self._poll_event_queue:
- self._poll(0)
-
- try:
- while event_handlers and self._poll_event_queue:
- f, event = self._next_poll_event()
- handler, reg_id = event_handlers[f]
- handler(f, event)
- events_handled += 1
- except StopIteration:
- events_handled += 1
-
- return bool(events_handled)
-
- def _register(self, f, eventmask, handler):
- """
- @rtype: Integer
- @return: A unique registration id, for use in schedule() or
- unregister() calls.
- """
- if f in self._poll_event_handlers:
- raise AssertionError("fd %d is already registered" % f)
- self._event_handler_id += 1
- reg_id = self._event_handler_id
- self._poll_event_handler_ids[reg_id] = f
- self._poll_event_handlers[f] = (handler, reg_id)
- self._poll_obj.register(f, eventmask)
- return reg_id
-
- def _unregister(self, reg_id):
- f = self._poll_event_handler_ids[reg_id]
- self._poll_obj.unregister(f)
- del self._poll_event_handlers[f]
- del self._poll_event_handler_ids[reg_id]
-
- def _schedule_wait(self, wait_ids):
- """
- Schedule until wait_id is not longer registered
- for poll() events.
- @type wait_id: int
- @param wait_id: a task id to wait for
- """
- event_handlers = self._poll_event_handlers
- handler_ids = self._poll_event_handler_ids
- event_handled = False
-
- if isinstance(wait_ids, int):
- wait_ids = frozenset([wait_ids])
-
- try:
- while wait_ids.intersection(handler_ids):
- f, event = self._next_poll_event()
- handler, reg_id = event_handlers[f]
- handler(f, event)
- event_handled = True
- except StopIteration:
- event_handled = True
-
- return event_handled
-
-class QueueScheduler(PollScheduler):
-
- """
- Add instances of SequentialTaskQueue and then call run(). The
- run() method returns when no tasks remain.
- """
-
- def __init__(self, max_jobs=None, max_load=None):
- PollScheduler.__init__(self)
-
- if max_jobs is None:
- max_jobs = 1
-
- self._max_jobs = max_jobs
- self._max_load = max_load
- self.sched_iface = self._sched_iface_class(
- register=self._register,
- schedule=self._schedule_wait,
- unregister=self._unregister)
-
- self._queues = []
- self._schedule_listeners = []
-
- def add(self, q):
- self._queues.append(q)
-
- def remove(self, q):
- self._queues.remove(q)
-
- def run(self):
-
- while self._schedule():
- self._poll_loop()
-
- while self._running_job_count():
- self._poll_loop()
-
- def _schedule_tasks(self):
- """
- @rtype: bool
- @returns: True if there may be remaining tasks to schedule,
- False otherwise.
- """
- while self._can_add_job():
- n = self._max_jobs - self._running_job_count()
- if n < 1:
- break
-
- if not self._start_next_job(n):
- return False
-
- for q in self._queues:
- if q:
- return True
- return False
-
- def _running_job_count(self):
- job_count = 0
- for q in self._queues:
- job_count += len(q.running_tasks)
- self._jobs = job_count
- return job_count
-
- def _start_next_job(self, n=1):
- started_count = 0
- for q in self._queues:
- initial_job_count = len(q.running_tasks)
- q.schedule()
- final_job_count = len(q.running_tasks)
- if final_job_count > initial_job_count:
- started_count += (final_job_count - initial_job_count)
- if started_count >= n:
- break
- return started_count
-
-class TaskScheduler(object):
-
- """
- A simple way to handle scheduling of AsynchrousTask instances. Simply
- add tasks and call run(). The run() method returns when no tasks remain.
- """
-
- def __init__(self, max_jobs=None, max_load=None):
- self._queue = SequentialTaskQueue(max_jobs=max_jobs)
- self._scheduler = QueueScheduler(
- max_jobs=max_jobs, max_load=max_load)
- self.sched_iface = self._scheduler.sched_iface
- self.run = self._scheduler.run
- self._scheduler.add(self._queue)
-
- def add(self, task):
- self._queue.add(task)
-
class Scheduler(PollScheduler):
_opts_ignore_blockers = \