summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2008-07-12 06:22:26 +0000
committerZac Medico <zmedico@gentoo.org>2008-07-12 06:22:26 +0000
commit31b18e72b3ed4077f31be583d9a83ad2de53df29 (patch)
tree29cb7823c5fc4fce43d29b5cb0e438f0f95e30ed
parentf9dc49ffd4486777e96b2e045c0b4523c479fef4 (diff)
downloadportage-31b18e72b3ed4077f31be583d9a83ad2de53df29.tar.gz
portage-31b18e72b3ed4077f31be583d9a83ad2de53df29.tar.bz2
portage-31b18e72b3ed4077f31be583d9a83ad2de53df29.zip
Create a test case for the poll loop which uses the loop to read data from a
pipe and assert that the data written to the pipe is identical to the data read from the pipe. In order to implement this test, several useful classes have been added: * PipeReader Reads output from one or more files and saves it in memory, for retrieval via the getvalue() method. This is driven by the scheduler's poll() loop, so it runs entirely within the current process. * QueueScheduler Add instances of SequentialTaskQueue and then call run(). The run() method returns when no tasks remain. * TaskScheduler A simple way to handle scheduling of AsynchrousTask instances. Simply add tasks and call run(). The run() method returns when no tasks remain. svn path=/main/trunk/; revision=11022
-rw-r--r--pym/_emerge/__init__.py267
-rw-r--r--pym/portage/tests/process/__init__.py3
-rw-r--r--pym/portage/tests/process/__test__0
-rw-r--r--pym/portage/tests/process/test_poll.py43
4 files changed, 272 insertions, 41 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py
index 38f485aae..6f975449a 100644
--- a/pym/_emerge/__init__.py
+++ b/pym/_emerge/__init__.py
@@ -1498,13 +1498,17 @@ class AsynchronousTask(SlotObject):
"""
__slots__ = ("background", "cancelled", "returncode") + \
- ("_exit_listeners",)
+ ("_exit_listeners", "_start_listeners")
def start(self):
"""
Start an asynchronous task and then return as soon as possible.
"""
- pass
+ self._start()
+ self._start_hook()
+
+ def _start(self):
+ raise NotImplementedError(self)
def isAlive(self):
return self.returncode is None
@@ -1529,6 +1533,25 @@ class AsynchronousTask(SlotObject):
self.cancelled = True
self.wait()
+ def addStartListener(self, f):
+ """
+ The function will be called with one argument, a reference to self.
+ """
+ if self._start_listeners is None:
+ self._start_listeners = []
+ self._start_listeners.append(f)
+
+ def removeStartListener(self, f):
+ self._start_listeners.remove(f)
+
+ def _start_hook(self):
+ if self._start_listeners is not None:
+ start_listeners = self._start_listeners
+ self._start_listeners = None
+
+ for f in start_listeners:
+ f(self)
+
def addExitListener(self, f):
"""
The function will be called with one argument, a reference to self.
@@ -1558,7 +1581,63 @@ class AsynchronousTask(SlotObject):
for f in exit_listeners:
f(self)
- self._exit_listeners = None
+
+class PipeReader(AsynchronousTask):
+
+ """
+ Reads output from one or more files and saves it in memory,
+ for retrieval via the getvalue() method. This is driven by
+ the scheduler's poll() loop, so it runs entirely within the
+ current process.
+ """
+
+ __slots__ = ("input_files", "scheduler",) + \
+ ("pid", "registered", "_reg_ids", "_read_data")
+
+ def _start(self):
+ self._reg_ids = set()
+ self._read_data = []
+ for k, f in self.input_files.iteritems():
+ fcntl.fcntl(f.fileno(), fcntl.F_SETFL,
+ fcntl.fcntl(f.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
+ self._reg_ids.add(self.scheduler.register(f.fileno(),
+ PollConstants.POLLIN, self._output_handler))
+ self.registered = True
+
+ def isAlive(self):
+ return self.registered
+
+ def _wait(self):
+ if self.returncode is not None:
+ return self.returncode
+ if self.registered:
+ self.scheduler.schedule(self._reg_ids)
+ self.returncode = os.EX_OK
+ return self.returncode
+
+ def getvalue(self):
+ """Retrieve the entire contents"""
+ return "".join(self._read_data)
+
+ def close(self):
+ """Free the memory buffer."""
+ self._read_data = None
+
+ def _output_handler(self, fd, event):
+ files = self.input_files
+ for f in files.itervalues():
+ if fd == f.fileno():
+ break
+ self._read_data.append(f.read())
+ if not self._read_data[-1]:
+ for f in files.values():
+ f.close()
+ self.registered = False
+ for reg_id in self._reg_ids:
+ self.scheduler.unregister(reg_id)
+ self.wait()
+
+ return self.registered
class CompositeTask(AsynchronousTask):
@@ -1689,7 +1768,7 @@ class TaskSequence(CompositeTask):
def add(self, task):
self._task_queue.append(task)
- def start(self):
+ def _start(self):
self._start_next_task()
def cancel(self):
@@ -1802,7 +1881,7 @@ class SpawnProcess(SubProcess):
_files_dict = slot_dict_class(_file_names, prefix="")
_bufsize = 4096
- def start(self):
+ def _start(self):
if self.cancelled:
return
@@ -1930,7 +2009,7 @@ class EbuildFetcher(SpawnProcess):
__slots__ = ("fetchonly", "pkg",)
- def start(self):
+ def _start(self):
root_config = self.pkg.root_config
portdb = root_config.trees["porttree"].dbapi
@@ -1952,7 +2031,7 @@ class EbuildFetcher(SpawnProcess):
self.args = fetch_args
self.env = fetch_env
- SpawnProcess.start(self)
+ SpawnProcess._start(self)
class EbuildBuildDir(SlotObject):
@@ -2036,7 +2115,7 @@ class EbuildBuild(CompositeTask):
"prefetcher", "settings", "world_atom") + \
("_build_dir", "_buildpkg", "_ebuild_path", "_tree")
- def start(self):
+ def _start(self):
logger = self.logger
opts = self.opts
@@ -2266,7 +2345,7 @@ class EbuildExecuter(CompositeTask):
_phases = ("setup", "unpack", "compile", "test", "install")
- def start(self):
+ def _start(self):
pkg = self.pkg
scheduler = self.scheduler
tree = "porttree"
@@ -2323,7 +2402,7 @@ class EbuildMetadataPhase(SubProcess):
_bufsize = SpawnProcess._bufsize
_metadata_fd = 9
- def start(self):
+ def _start(self):
settings = self.settings
settings.reset()
ebuild_path = self.ebuild_path
@@ -2408,7 +2487,7 @@ class EbuildPhase(SubProcess):
_files_dict = slot_dict_class(_file_names, prefix="")
_bufsize = 4096
- def start(self):
+ def _start(self):
root_config = self.pkg.root_config
tree = self.tree
mydbapi = root_config.trees[tree].dbapi
@@ -2588,7 +2667,7 @@ class EbuildBinpkg(EbuildPhase):
"""
__slots__ = ("_binpkg_tmpfile",)
- def start(self):
+ def _start(self):
self.phase = "package"
self.tree = "porttree"
pkg = self.pkg
@@ -2607,7 +2686,7 @@ class EbuildBinpkg(EbuildPhase):
settings.backup_changes("PORTAGE_BINPKG_TMPFILE")
try:
- EbuildPhase.start(self)
+ EbuildPhase._start(self)
finally:
settings.pop("PORTAGE_BINPKG_TMPFILE", None)
@@ -2688,7 +2767,7 @@ class Binpkg(CompositeTask):
("_bintree", "_build_dir", "_ebuild_path", "_fetched_pkg",
"_image_dir", "_infloc", "_pkg_path", "_tree", "_verify")
- def start(self):
+ def _start(self):
pkg = self.pkg
settings = self.settings
@@ -2945,7 +3024,7 @@ class BinpkgFetcher(SpawnProcess):
pkg = self.pkg
self.pkg_path = pkg.root_config.trees["bintree"].getname(pkg.cpv)
- def start(self):
+ def _start(self):
if self.cancelled:
return
@@ -3004,7 +3083,7 @@ class BinpkgFetcher(SpawnProcess):
self.args = fetch_args
self.env = fetch_env
- SpawnProcess.start(self)
+ SpawnProcess._start(self)
def _set_returncode(self, wait_retval):
SpawnProcess._set_returncode(self, wait_retval)
@@ -3038,7 +3117,7 @@ class BinpkgFetcher(SpawnProcess):
class BinpkgVerifier(AsynchronousTask):
__slots__ = ("pkg",)
- def start(self):
+ def _start(self):
"""
Note: Unlike a normal AsynchronousTask.start() method,
this one does all work is synchronously. The returncode
@@ -3077,14 +3156,14 @@ class BinpkgExtractorAsync(SpawnProcess):
_shell_binary = portage.const.BASH_BINARY
- def start(self):
+ def _start(self):
self.args = [self._shell_binary, "-c",
"bzip2 -dqc -- %s | tar -xp -C %s -f -" % \
(portage._shell_quote(self.pkg_path),
portage._shell_quote(self.image_dir))]
self.env = self.pkg.root_config.settings.environ()
- SpawnProcess.start(self)
+ SpawnProcess._start(self)
class MergeListItem(CompositeTask):
@@ -3100,7 +3179,7 @@ class MergeListItem(CompositeTask):
"settings", "world_atom") + \
("_install_task",)
- def start(self):
+ def _start(self):
pkg = self.pkg
build_opts = self.build_opts
@@ -3213,7 +3292,7 @@ class PackageMerge(AsynchronousTask):
__slots__ = ("merge",)
- def start(self):
+ def _start(self):
self.returncode = self.merge.merge()
self.wait()
@@ -7729,7 +7808,8 @@ class PollSelectAdapter(PollConstants):
class SequentialTaskQueue(SlotObject):
- __slots__ = ("max_jobs", "running_tasks", "_task_queue", "_scheduling")
+ __slots__ = ("auto_schedule", "max_jobs", "running_tasks") + \
+ ("_task_queue", "_scheduling")
def __init__(self, **kwargs):
SlotObject.__init__(self, **kwargs)
@@ -7740,11 +7820,13 @@ class SequentialTaskQueue(SlotObject):
def add(self, task):
self._task_queue.append(task)
- self.schedule()
+ if self.auto_schedule:
+ self.schedule()
def addFront(self, task):
self._task_queue.appendleft(task)
- self.schedule()
+ if self.auto_schedule:
+ self.schedule()
def schedule(self):
@@ -7767,16 +7849,15 @@ class SequentialTaskQueue(SlotObject):
if hasattr(task, "registered") and task.registered:
continue
if task.poll() is not None:
- running_tasks.remove(task)
state_changed = True
while task_queue and (len(running_tasks) < max_jobs):
task = task_queue.popleft()
cancelled = getattr(task, "cancelled", None)
if not cancelled:
+ running_tasks.add(task)
task.addExitListener(self._task_exit)
task.start()
- running_tasks.add(task)
state_changed = True
self._scheduling = False
@@ -7784,7 +7865,9 @@ class SequentialTaskQueue(SlotObject):
return state_changed
def _task_exit(self, task):
- self.schedule()
+ self.running_tasks.remove(task)
+ if self.auto_schedule:
+ self.schedule()
def clear(self):
self._task_queue.clear()
@@ -7799,7 +7882,10 @@ class SequentialTaskQueue(SlotObject):
def __len__(self):
return len(self._task_queue) + len(self.running_tasks)
-class PollLoop(object):
+class PollScheduler(object):
+
+ class _sched_iface_class(SlotObject):
+ __slots__ = ("register", "schedule", "unregister")
def __init__(self):
self._max_jobs = 1
@@ -7814,15 +7900,18 @@ class PollLoop(object):
except AttributeError:
self._poll = PollSelectAdapter()
+ def _running_job_count(self):
+ return self._jobs
+
def _can_add_job(self):
- jobs = self._jobs
max_jobs = self._max_jobs
max_load = self._max_load
- if self._jobs >= self._max_jobs:
+ if self._running_job_count() >= self._max_jobs:
return False
- if max_load is not None and max_jobs > 1 and self._jobs > 1:
+ if max_load is not None and max_jobs > 1 and \
+ self._running_job_count() > 1:
try:
avg1, avg5, avg15 = os.getloadavg()
except OSError, e:
@@ -7872,7 +7961,7 @@ class PollLoop(object):
del self._poll_event_handlers[f]
del self._poll_event_handler_ids[reg_id]
- def _schedule(self, wait_id):
+ def _schedule(self, wait_ids):
"""
Schedule until wait_id is not longer registered
for poll() events.
@@ -7883,12 +7972,110 @@ class PollLoop(object):
handler_ids = self._poll_event_handler_ids
poll = self._poll.poll
- while wait_id in handler_ids:
+ if isinstance(wait_ids, int):
+ wait_ids = frozenset([wait_ids])
+
+ while wait_ids.intersection(handler_ids):
for f, event in poll():
handler, reg_id = event_handlers[f]
handler(f, event)
-class Scheduler(PollLoop):
+class QueueScheduler(PollScheduler):
+
+ """
+ Add instances of SequentialTaskQueue and then call run(). The
+ run() method returns when no tasks remain.
+ """
+
+ def __init__(self, max_jobs=None, max_load=None):
+ PollScheduler.__init__(self)
+
+ if max_jobs is None:
+ max_jobs = 1
+
+ self._max_jobs = max_jobs
+ self._max_load = max_load
+ self.sched_iface = self._sched_iface_class(
+ register=self._register,
+ schedule=self._schedule,
+ unregister=self._unregister)
+
+ self._queues = []
+ self._schedule_listeners = []
+
+ def add(self, q):
+ self._queues.append(q)
+
+ def remove(self, q):
+ self._queues.remove(q)
+
+ def run(self):
+
+ while self._schedule_tasks():
+ self._poll_loop()
+
+ while self._running_job_count():
+ self._poll_loop()
+
+ def _schedule_tasks(self):
+ """
+ @rtype: bool
+ @returns: True if there may be remaining tasks to schedule,
+ False otherwise.
+ """
+ while self._can_add_job():
+ n = self._max_jobs - self._running_job_count()
+ if n < 1:
+ break
+
+ if not self._start_next_job(n):
+ return False
+
+ for q in self._queues:
+ if q:
+ return True
+ return False
+
+ def _running_job_count(self):
+ job_count = 0
+ for q in self._queues:
+ job_count += len(q.running_tasks)
+ self._jobs = job_count
+ return job_count
+
+ def _start_next_job(self, n=1):
+ started_count = 0
+ for q in self._queues:
+ initial_job_count = len(q.running_tasks)
+ q.schedule()
+ final_job_count = len(q.running_tasks)
+ if final_job_count > initial_job_count:
+ started_count += (final_job_count - initial_job_count)
+ if started_count >= n:
+ break
+ return started_count
+
+class TaskScheduler(object):
+
+ """
+ A simple way to handle scheduling of AsynchrousTask instances. Simply
+ add tasks and call run(). The run() method returns when no tasks remain.
+ """
+
+ def __init__(self, max_jobs=None, max_load=None):
+ self._queue = SequentialTaskQueue(max_jobs=max_jobs)
+ self._scheduler = QueueScheduler(max_jobs=max_jobs, max_load=max_load)
+ self.sched_iface = self._scheduler.sched_iface
+ self.run = self._scheduler.run
+ self._scheduler.add(self._queue)
+
+ def add(self, task):
+ self._queue.add(task)
+
+ def run(self):
+ self._scheduler.schedule()
+
+class Scheduler(PollScheduler):
_opts_ignore_blockers = \
frozenset(["--buildpkgonly",
@@ -7930,7 +8117,7 @@ class Scheduler(PollLoop):
def __init__(self, settings, trees, mtimedb, myopts,
spinner, mergelist, favorites, digraph):
- PollLoop.__init__(self)
+ PollScheduler.__init__(self)
self.settings = settings
self.target_root = settings["ROOT"]
self.trees = trees
@@ -7970,7 +8157,8 @@ class Scheduler(PollLoop):
self._task_queues = self._task_queues_class()
for k in self._task_queues.allowed_keys:
- setattr(self._task_queues, k, SequentialTaskQueue())
+ setattr(self._task_queues, k,
+ SequentialTaskQueue(auto_schedule=True))
self._prefetchers = weakref.WeakValueDictionary()
self._pkg_queue = []
@@ -8738,13 +8926,10 @@ class Scheduler(PollLoop):
return pkg
-class MetadataRegen(PollLoop):
-
- class _sched_iface_class(SlotObject):
- __slots__ = ("register", "schedule", "unregister")
+class MetadataRegen(PollScheduler):
def __init__(self, portdb, max_jobs=None, max_load=None):
- PollLoop.__init__(self)
+ PollScheduler.__init__(self)
self._portdb = portdb
if max_jobs is None:
diff --git a/pym/portage/tests/process/__init__.py b/pym/portage/tests/process/__init__.py
new file mode 100644
index 000000000..a4a87a461
--- /dev/null
+++ b/pym/portage/tests/process/__init__.py
@@ -0,0 +1,3 @@
+# Copyright 1998-2008 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+# $Id: __init__.py 6870 2007-06-19 07:22:18Z zmedico $
diff --git a/pym/portage/tests/process/__test__ b/pym/portage/tests/process/__test__
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/pym/portage/tests/process/__test__
diff --git a/pym/portage/tests/process/test_poll.py b/pym/portage/tests/process/test_poll.py
new file mode 100644
index 000000000..45c0ad03e
--- /dev/null
+++ b/pym/portage/tests/process/test_poll.py
@@ -0,0 +1,43 @@
+# Copyright 1998-2008 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+# $Id: test_spawn.py 8474 2007-11-09 03:35:38Z zmedico $
+
+import errno, os, sys
+from portage.tests import TestCase
+from _emerge import PipeReader, SpawnProcess, TaskScheduler
+
+class PollTestCase(TestCase):
+
+ def testPipeReader(self):
+ """
+ Use a poll loop to read data from a pipe and assert that
+ the data written to the pipe is identical to the data
+ read from the pipe.
+ """
+
+ test_string = 2 * "blah blah blah\n"
+
+ master_fd, slave_fd = os.pipe()
+ master_file = os.fdopen(master_fd, 'r')
+
+ task_scheduler = TaskScheduler(max_jobs=2)
+ scheduler = task_scheduler.sched_iface
+
+ producer = SpawnProcess(
+ args=["bash", "-c", "echo -n '%s'" % test_string],
+ fd_pipes={1:slave_fd}, scheduler=scheduler)
+
+ consumer = PipeReader(
+ input_files={"producer" : master_file},
+ scheduler=scheduler)
+
+ task_scheduler.add(producer)
+ task_scheduler.add(consumer)
+
+ def producer_start_cb(task):
+ os.close(slave_fd)
+
+ producer.addStartListener(producer_start_cb)
+ task_scheduler.run()
+
+ self.assertEqual(test_string, consumer.getvalue())