summaryrefslogtreecommitdiffstats
path: root/pym
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2009-06-22 18:47:26 +0000
committerZac Medico <zmedico@gentoo.org>2009-06-22 18:47:26 +0000
commitbf9282b6782ad433b2ca905a5131bd0c424a2d94 (patch)
tree07e4481a6a7bef331e8f463b3a7ce1b40fe5e4d0 /pym
parent69500d28402817fb70c104501b5c8f4f54650f4a (diff)
downloadportage-bf9282b6782ad433b2ca905a5131bd0c424a2d94.tar.gz
portage-bf9282b6782ad433b2ca905a5131bd0c424a2d94.tar.bz2
portage-bf9282b6782ad433b2ca905a5131bd0c424a2d94.zip
Bug #275047 - Split _emerge/__init__.py into smaller pieces (part 3).
Thanks to Sebastian Mingramm (few) <s.mingramm@gmx.de> for this patch. svn path=/main/trunk/; revision=13668
Diffstat (limited to 'pym')
-rw-r--r--pym/_emerge/PollScheduler.py251
-rw-r--r--pym/_emerge/QueueScheduler.py77
-rw-r--r--pym/_emerge/TaskScheduler.py21
-rw-r--r--pym/_emerge/__init__.py341
-rw-r--r--pym/portage/tests/process/test_poll.py2
5 files changed, 351 insertions, 341 deletions
diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py
new file mode 100644
index 000000000..3a23877f9
--- /dev/null
+++ b/pym/_emerge/PollScheduler.py
@@ -0,0 +1,251 @@
+import logging
+import select
+
+from portage.util import writemsg_level
+
+from _emerge.SlotObject import SlotObject
+from _emerge.getloadavg import getloadavg
+from _emerge.PollConstants import PollConstants
+from _emerge.PollSelectAdapter import PollSelectAdapter
+
+class PollScheduler(object):
+
+ class _sched_iface_class(SlotObject):
+ __slots__ = ("register", "schedule", "unregister")
+
+ def __init__(self):
+ self._max_jobs = 1
+ self._max_load = None
+ self._jobs = 0
+ self._poll_event_queue = []
+ self._poll_event_handlers = {}
+ self._poll_event_handler_ids = {}
+ # Increment id for each new handler.
+ self._event_handler_id = 0
+ self._poll_obj = create_poll_instance()
+ self._scheduling = False
+
+ def _schedule(self):
+ """
+ Calls _schedule_tasks() and automatically returns early from
+ any recursive calls to this method that the _schedule_tasks()
+ call might trigger. This makes _schedule() safe to call from
+ inside exit listeners.
+ """
+ if self._scheduling:
+ return False
+ self._scheduling = True
+ try:
+ return self._schedule_tasks()
+ finally:
+ self._scheduling = False
+
+ def _running_job_count(self):
+ return self._jobs
+
+ def _can_add_job(self):
+ max_jobs = self._max_jobs
+ max_load = self._max_load
+
+ if self._max_jobs is not True and \
+ self._running_job_count() >= self._max_jobs:
+ return False
+
+ if max_load is not None and \
+ (max_jobs is True or max_jobs > 1) and \
+ self._running_job_count() >= 1:
+ try:
+ avg1, avg5, avg15 = getloadavg()
+ except OSError:
+ return False
+
+ if avg1 >= max_load:
+ return False
+
+ return True
+
+ def _poll(self, timeout=None):
+ """
+ All poll() calls pass through here. The poll events
+ are added directly to self._poll_event_queue.
+ In order to avoid endless blocking, this raises
+ StopIteration if timeout is None and there are
+ no file descriptors to poll.
+ """
+ if not self._poll_event_handlers:
+ self._schedule()
+ if timeout is None and \
+ not self._poll_event_handlers:
+ raise StopIteration(
+ "timeout is None and there are no poll() event handlers")
+
+ # The following error is known to occur with Linux kernel versions
+ # less than 2.6.24:
+ #
+ # select.error: (4, 'Interrupted system call')
+ #
+ # This error has been observed after a SIGSTOP, followed by SIGCONT.
+ # Treat it similar to EAGAIN if timeout is None, otherwise just return
+ # without any events.
+ while True:
+ try:
+ self._poll_event_queue.extend(self._poll_obj.poll(timeout))
+ break
+ except select.error, e:
+ writemsg_level("\n!!! select error: %s\n" % (e,),
+ level=logging.ERROR, noiselevel=-1)
+ del e
+ if timeout is not None:
+ break
+
+ def _next_poll_event(self, timeout=None):
+ """
+ Since the _schedule_wait() loop is called by event
+ handlers from _poll_loop(), maintain a central event
+ queue for both of them to share events from a single
+ poll() call. In order to avoid endless blocking, this
+ raises StopIteration if timeout is None and there are
+ no file descriptors to poll.
+ """
+ if not self._poll_event_queue:
+ self._poll(timeout)
+ return self._poll_event_queue.pop()
+
+ def _poll_loop(self):
+
+ event_handlers = self._poll_event_handlers
+ event_handled = False
+
+ try:
+ while event_handlers:
+ f, event = self._next_poll_event()
+ handler, reg_id = event_handlers[f]
+ handler(f, event)
+ event_handled = True
+ except StopIteration:
+ event_handled = True
+
+ if not event_handled:
+ raise AssertionError("tight loop")
+
+ def _schedule_yield(self):
+ """
+ Schedule for a short period of time chosen by the scheduler based
+ on internal state. Synchronous tasks should call this periodically
+ in order to allow the scheduler to service pending poll events. The
+ scheduler will call poll() exactly once, without blocking, and any
+ resulting poll events will be serviced.
+ """
+ event_handlers = self._poll_event_handlers
+ events_handled = 0
+
+ if not event_handlers:
+ return bool(events_handled)
+
+ if not self._poll_event_queue:
+ self._poll(0)
+
+ try:
+ while event_handlers and self._poll_event_queue:
+ f, event = self._next_poll_event()
+ handler, reg_id = event_handlers[f]
+ handler(f, event)
+ events_handled += 1
+ except StopIteration:
+ events_handled += 1
+
+ return bool(events_handled)
+
+ def _register(self, f, eventmask, handler):
+ """
+ @rtype: Integer
+ @return: A unique registration id, for use in schedule() or
+ unregister() calls.
+ """
+ if f in self._poll_event_handlers:
+ raise AssertionError("fd %d is already registered" % f)
+ self._event_handler_id += 1
+ reg_id = self._event_handler_id
+ self._poll_event_handler_ids[reg_id] = f
+ self._poll_event_handlers[f] = (handler, reg_id)
+ self._poll_obj.register(f, eventmask)
+ return reg_id
+
+ def _unregister(self, reg_id):
+ f = self._poll_event_handler_ids[reg_id]
+ self._poll_obj.unregister(f)
+ del self._poll_event_handlers[f]
+ del self._poll_event_handler_ids[reg_id]
+
+ def _schedule_wait(self, wait_ids):
+ """
+ Schedule until wait_id is not longer registered
+ for poll() events.
+ @type wait_id: int
+ @param wait_id: a task id to wait for
+ """
+ event_handlers = self._poll_event_handlers
+ handler_ids = self._poll_event_handler_ids
+ event_handled = False
+
+ if isinstance(wait_ids, int):
+ wait_ids = frozenset([wait_ids])
+
+ try:
+ while wait_ids.intersection(handler_ids):
+ f, event = self._next_poll_event()
+ handler, reg_id = event_handlers[f]
+ handler(f, event)
+ event_handled = True
+ except StopIteration:
+ event_handled = True
+
+ return event_handled
+
+
+_can_poll_device = None
+
+def can_poll_device():
+ """
+ Test if it's possible to use poll() on a device such as a pty. This
+ is known to fail on Darwin.
+ @rtype: bool
+ @returns: True if poll() on a device succeeds, False otherwise.
+ """
+
+ global _can_poll_device
+ if _can_poll_device is not None:
+ return _can_poll_device
+
+ if not hasattr(select, "poll"):
+ _can_poll_device = False
+ return _can_poll_device
+
+ try:
+ dev_null = open('/dev/null', 'rb')
+ except IOError:
+ _can_poll_device = False
+ return _can_poll_device
+
+ p = select.poll()
+ p.register(dev_null.fileno(), PollConstants.POLLIN)
+
+ invalid_request = False
+ for f, event in p.poll():
+ if event & PollConstants.POLLNVAL:
+ invalid_request = True
+ break
+ dev_null.close()
+
+ _can_poll_device = not invalid_request
+ return _can_poll_device
+
+def create_poll_instance():
+ """
+ Create an instance of select.poll, or an instance of
+ PollSelectAdapter there is no poll() implementation or
+ it is broken somehow.
+ """
+ if can_poll_device():
+ return select.poll()
+ return PollSelectAdapter()
diff --git a/pym/_emerge/QueueScheduler.py b/pym/_emerge/QueueScheduler.py
new file mode 100644
index 000000000..f88c13caa
--- /dev/null
+++ b/pym/_emerge/QueueScheduler.py
@@ -0,0 +1,77 @@
+from _emerge.PollScheduler import PollScheduler
+
+class QueueScheduler(PollScheduler):
+
+ """
+ Add instances of SequentialTaskQueue and then call run(). The
+ run() method returns when no tasks remain.
+ """
+
+ def __init__(self, max_jobs=None, max_load=None):
+ PollScheduler.__init__(self)
+
+ if max_jobs is None:
+ max_jobs = 1
+
+ self._max_jobs = max_jobs
+ self._max_load = max_load
+ self.sched_iface = self._sched_iface_class(
+ register=self._register,
+ schedule=self._schedule_wait,
+ unregister=self._unregister)
+
+ self._queues = []
+ self._schedule_listeners = []
+
+ def add(self, q):
+ self._queues.append(q)
+
+ def remove(self, q):
+ self._queues.remove(q)
+
+ def run(self):
+
+ while self._schedule():
+ self._poll_loop()
+
+ while self._running_job_count():
+ self._poll_loop()
+
+ def _schedule_tasks(self):
+ """
+ @rtype: bool
+ @returns: True if there may be remaining tasks to schedule,
+ False otherwise.
+ """
+ while self._can_add_job():
+ n = self._max_jobs - self._running_job_count()
+ if n < 1:
+ break
+
+ if not self._start_next_job(n):
+ return False
+
+ for q in self._queues:
+ if q:
+ return True
+ return False
+
+ def _running_job_count(self):
+ job_count = 0
+ for q in self._queues:
+ job_count += len(q.running_tasks)
+ self._jobs = job_count
+ return job_count
+
+ def _start_next_job(self, n=1):
+ started_count = 0
+ for q in self._queues:
+ initial_job_count = len(q.running_tasks)
+ q.schedule()
+ final_job_count = len(q.running_tasks)
+ if final_job_count > initial_job_count:
+ started_count += (final_job_count - initial_job_count)
+ if started_count >= n:
+ break
+ return started_count
+
diff --git a/pym/_emerge/TaskScheduler.py b/pym/_emerge/TaskScheduler.py
new file mode 100644
index 000000000..564f130aa
--- /dev/null
+++ b/pym/_emerge/TaskScheduler.py
@@ -0,0 +1,21 @@
+from _emerge.QueueScheduler import QueueScheduler
+from _emerge.SequentialTaskQueue import SequentialTaskQueue
+
+class TaskScheduler(object):
+
+ """
+ A simple way to handle scheduling of AsynchrousTask instances. Simply
+ add tasks and call run(). The run() method returns when no tasks remain.
+ """
+
+ def __init__(self, max_jobs=None, max_load=None):
+ self._queue = SequentialTaskQueue(max_jobs=max_jobs)
+ self._scheduler = QueueScheduler(
+ max_jobs=max_jobs, max_load=max_load)
+ self.sched_iface = self._scheduler.sched_iface
+ self.run = self._scheduler.run
+ self._scheduler.add(self._queue)
+
+ def add(self, task):
+ self._queue.add(task)
+
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py
index 3a3dac25e..de5642da9 100644
--- a/pym/_emerge/__init__.py
+++ b/pym/_emerge/__init__.py
@@ -3,10 +3,8 @@
# Distributed under the terms of the GNU General Public License v2
# $Id$
-import formatter
import logging
import pwd
-import select
import shlex
import signal
import sys
@@ -61,7 +59,6 @@ from _emerge.DepPriorityNormalRange import DepPriorityNormalRange
from _emerge.DepPrioritySatisfiedRange import DepPrioritySatisfiedRange
from _emerge.Task import Task
from _emerge.Blocker import Blocker
-from _emerge.PollConstants import PollConstants
from _emerge.AsynchronousTask import AsynchronousTask
from _emerge.CompositeTask import CompositeTask
from _emerge.EbuildFetcher import EbuildFetcher
@@ -80,13 +77,12 @@ from _emerge.BlockerCache import BlockerCache
from _emerge.PackageVirtualDbapi import PackageVirtualDbapi
from _emerge.RepoDisplay import RepoDisplay
from _emerge.UseFlagDisplay import UseFlagDisplay
-from _emerge.PollSelectAdapter import PollSelectAdapter
from _emerge.SequentialTaskQueue import SequentialTaskQueue
from _emerge.ProgressHandler import ProgressHandler
from _emerge.stdout_spinner import stdout_spinner
from _emerge.UninstallFailure import UninstallFailure
from _emerge.JobStatusDisplay import JobStatusDisplay
-from _emerge.getloadavg import getloadavg
+from _emerge.PollScheduler import PollScheduler
def userquery(prompt, responses=None, colours=None):
"""Displays a prompt and a set of responses, then waits for a response
@@ -6445,341 +6441,6 @@ class PackageCounters(object):
(self.blocks - self.blocks_satisfied))
return "".join(myoutput)
-
-_can_poll_device = None
-
-def can_poll_device():
- """
- Test if it's possible to use poll() on a device such as a pty. This
- is known to fail on Darwin.
- @rtype: bool
- @returns: True if poll() on a device succeeds, False otherwise.
- """
-
- global _can_poll_device
- if _can_poll_device is not None:
- return _can_poll_device
-
- if not hasattr(select, "poll"):
- _can_poll_device = False
- return _can_poll_device
-
- try:
- dev_null = open('/dev/null', 'rb')
- except IOError:
- _can_poll_device = False
- return _can_poll_device
-
- p = select.poll()
- p.register(dev_null.fileno(), PollConstants.POLLIN)
-
- invalid_request = False
- for f, event in p.poll():
- if event & PollConstants.POLLNVAL:
- invalid_request = True
- break
- dev_null.close()
-
- _can_poll_device = not invalid_request
- return _can_poll_device
-
-def create_poll_instance():
- """
- Create an instance of select.poll, or an instance of
- PollSelectAdapter there is no poll() implementation or
- it is broken somehow.
- """
- if can_poll_device():
- return select.poll()
- return PollSelectAdapter()
-
-class PollScheduler(object):
-
- class _sched_iface_class(SlotObject):
- __slots__ = ("register", "schedule", "unregister")
-
- def __init__(self):
- self._max_jobs = 1
- self._max_load = None
- self._jobs = 0
- self._poll_event_queue = []
- self._poll_event_handlers = {}
- self._poll_event_handler_ids = {}
- # Increment id for each new handler.
- self._event_handler_id = 0
- self._poll_obj = create_poll_instance()
- self._scheduling = False
-
- def _schedule(self):
- """
- Calls _schedule_tasks() and automatically returns early from
- any recursive calls to this method that the _schedule_tasks()
- call might trigger. This makes _schedule() safe to call from
- inside exit listeners.
- """
- if self._scheduling:
- return False
- self._scheduling = True
- try:
- return self._schedule_tasks()
- finally:
- self._scheduling = False
-
- def _running_job_count(self):
- return self._jobs
-
- def _can_add_job(self):
- max_jobs = self._max_jobs
- max_load = self._max_load
-
- if self._max_jobs is not True and \
- self._running_job_count() >= self._max_jobs:
- return False
-
- if max_load is not None and \
- (max_jobs is True or max_jobs > 1) and \
- self._running_job_count() >= 1:
- try:
- avg1, avg5, avg15 = getloadavg()
- except OSError:
- return False
-
- if avg1 >= max_load:
- return False
-
- return True
-
- def _poll(self, timeout=None):
- """
- All poll() calls pass through here. The poll events
- are added directly to self._poll_event_queue.
- In order to avoid endless blocking, this raises
- StopIteration if timeout is None and there are
- no file descriptors to poll.
- """
- if not self._poll_event_handlers:
- self._schedule()
- if timeout is None and \
- not self._poll_event_handlers:
- raise StopIteration(
- "timeout is None and there are no poll() event handlers")
-
- # The following error is known to occur with Linux kernel versions
- # less than 2.6.24:
- #
- # select.error: (4, 'Interrupted system call')
- #
- # This error has been observed after a SIGSTOP, followed by SIGCONT.
- # Treat it similar to EAGAIN if timeout is None, otherwise just return
- # without any events.
- while True:
- try:
- self._poll_event_queue.extend(self._poll_obj.poll(timeout))
- break
- except select.error, e:
- writemsg_level("\n!!! select error: %s\n" % (e,),
- level=logging.ERROR, noiselevel=-1)
- del e
- if timeout is not None:
- break
-
- def _next_poll_event(self, timeout=None):
- """
- Since the _schedule_wait() loop is called by event
- handlers from _poll_loop(), maintain a central event
- queue for both of them to share events from a single
- poll() call. In order to avoid endless blocking, this
- raises StopIteration if timeout is None and there are
- no file descriptors to poll.
- """
- if not self._poll_event_queue:
- self._poll(timeout)
- return self._poll_event_queue.pop()
-
- def _poll_loop(self):
-
- event_handlers = self._poll_event_handlers
- event_handled = False
-
- try:
- while event_handlers:
- f, event = self._next_poll_event()
- handler, reg_id = event_handlers[f]
- handler(f, event)
- event_handled = True
- except StopIteration:
- event_handled = True
-
- if not event_handled:
- raise AssertionError("tight loop")
-
- def _schedule_yield(self):
- """
- Schedule for a short period of time chosen by the scheduler based
- on internal state. Synchronous tasks should call this periodically
- in order to allow the scheduler to service pending poll events. The
- scheduler will call poll() exactly once, without blocking, and any
- resulting poll events will be serviced.
- """
- event_handlers = self._poll_event_handlers
- events_handled = 0
-
- if not event_handlers:
- return bool(events_handled)
-
- if not self._poll_event_queue:
- self._poll(0)
-
- try:
- while event_handlers and self._poll_event_queue:
- f, event = self._next_poll_event()
- handler, reg_id = event_handlers[f]
- handler(f, event)
- events_handled += 1
- except StopIteration:
- events_handled += 1
-
- return bool(events_handled)
-
- def _register(self, f, eventmask, handler):
- """
- @rtype: Integer
- @return: A unique registration id, for use in schedule() or
- unregister() calls.
- """
- if f in self._poll_event_handlers:
- raise AssertionError("fd %d is already registered" % f)
- self._event_handler_id += 1
- reg_id = self._event_handler_id
- self._poll_event_handler_ids[reg_id] = f
- self._poll_event_handlers[f] = (handler, reg_id)
- self._poll_obj.register(f, eventmask)
- return reg_id
-
- def _unregister(self, reg_id):
- f = self._poll_event_handler_ids[reg_id]
- self._poll_obj.unregister(f)
- del self._poll_event_handlers[f]
- del self._poll_event_handler_ids[reg_id]
-
- def _schedule_wait(self, wait_ids):
- """
- Schedule until wait_id is not longer registered
- for poll() events.
- @type wait_id: int
- @param wait_id: a task id to wait for
- """
- event_handlers = self._poll_event_handlers
- handler_ids = self._poll_event_handler_ids
- event_handled = False
-
- if isinstance(wait_ids, int):
- wait_ids = frozenset([wait_ids])
-
- try:
- while wait_ids.intersection(handler_ids):
- f, event = self._next_poll_event()
- handler, reg_id = event_handlers[f]
- handler(f, event)
- event_handled = True
- except StopIteration:
- event_handled = True
-
- return event_handled
-
-class QueueScheduler(PollScheduler):
-
- """
- Add instances of SequentialTaskQueue and then call run(). The
- run() method returns when no tasks remain.
- """
-
- def __init__(self, max_jobs=None, max_load=None):
- PollScheduler.__init__(self)
-
- if max_jobs is None:
- max_jobs = 1
-
- self._max_jobs = max_jobs
- self._max_load = max_load
- self.sched_iface = self._sched_iface_class(
- register=self._register,
- schedule=self._schedule_wait,
- unregister=self._unregister)
-
- self._queues = []
- self._schedule_listeners = []
-
- def add(self, q):
- self._queues.append(q)
-
- def remove(self, q):
- self._queues.remove(q)
-
- def run(self):
-
- while self._schedule():
- self._poll_loop()
-
- while self._running_job_count():
- self._poll_loop()
-
- def _schedule_tasks(self):
- """
- @rtype: bool
- @returns: True if there may be remaining tasks to schedule,
- False otherwise.
- """
- while self._can_add_job():
- n = self._max_jobs - self._running_job_count()
- if n < 1:
- break
-
- if not self._start_next_job(n):
- return False
-
- for q in self._queues:
- if q:
- return True
- return False
-
- def _running_job_count(self):
- job_count = 0
- for q in self._queues:
- job_count += len(q.running_tasks)
- self._jobs = job_count
- return job_count
-
- def _start_next_job(self, n=1):
- started_count = 0
- for q in self._queues:
- initial_job_count = len(q.running_tasks)
- q.schedule()
- final_job_count = len(q.running_tasks)
- if final_job_count > initial_job_count:
- started_count += (final_job_count - initial_job_count)
- if started_count >= n:
- break
- return started_count
-
-class TaskScheduler(object):
-
- """
- A simple way to handle scheduling of AsynchrousTask instances. Simply
- add tasks and call run(). The run() method returns when no tasks remain.
- """
-
- def __init__(self, max_jobs=None, max_load=None):
- self._queue = SequentialTaskQueue(max_jobs=max_jobs)
- self._scheduler = QueueScheduler(
- max_jobs=max_jobs, max_load=max_load)
- self.sched_iface = self._scheduler.sched_iface
- self.run = self._scheduler.run
- self._scheduler.add(self._queue)
-
- def add(self, task):
- self._queue.add(task)
-
class Scheduler(PollScheduler):
_opts_ignore_blockers = \
diff --git a/pym/portage/tests/process/test_poll.py b/pym/portage/tests/process/test_poll.py
index f5669780f..3f4a597b4 100644
--- a/pym/portage/tests/process/test_poll.py
+++ b/pym/portage/tests/process/test_poll.py
@@ -8,7 +8,7 @@ import termios
import portage
from portage.output import get_term_size, set_term_size
from portage.tests import TestCase
-from _emerge import TaskScheduler
+from _emerge.TaskScheduler import TaskScheduler
from _emerge.PipeReader import PipeReader
from _emerge.SpawnProcess import SpawnProcess