summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--pym/_emerge/__init__.py267
-rw-r--r--pym/portage/tests/process/__init__.py3
-rw-r--r--pym/portage/tests/process/__test__0
-rw-r--r--pym/portage/tests/process/test_poll.py43
4 files changed, 272 insertions, 41 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py
index 38f485aae..6f975449a 100644
--- a/pym/_emerge/__init__.py
+++ b/pym/_emerge/__init__.py
@@ -1498,13 +1498,17 @@ class AsynchronousTask(SlotObject):
"""
__slots__ = ("background", "cancelled", "returncode") + \
- ("_exit_listeners",)
+ ("_exit_listeners", "_start_listeners")
def start(self):
"""
Start an asynchronous task and then return as soon as possible.
"""
- pass
+ self._start()
+ self._start_hook()
+
+ def _start(self):
+ raise NotImplementedError(self)
def isAlive(self):
return self.returncode is None
@@ -1529,6 +1533,25 @@ class AsynchronousTask(SlotObject):
self.cancelled = True
self.wait()
+ def addStartListener(self, f):
+ """
+ The function will be called with one argument, a reference to self.
+ """
+ if self._start_listeners is None:
+ self._start_listeners = []
+ self._start_listeners.append(f)
+
+ def removeStartListener(self, f):
+ self._start_listeners.remove(f)
+
+ def _start_hook(self):
+ if self._start_listeners is not None:
+ start_listeners = self._start_listeners
+ self._start_listeners = None
+
+ for f in start_listeners:
+ f(self)
+
def addExitListener(self, f):
"""
The function will be called with one argument, a reference to self.
@@ -1558,7 +1581,63 @@ class AsynchronousTask(SlotObject):
for f in exit_listeners:
f(self)
- self._exit_listeners = None
+
+class PipeReader(AsynchronousTask):
+
+ """
+ Reads output from one or more files and saves it in memory,
+ for retrieval via the getvalue() method. This is driven by
+ the scheduler's poll() loop, so it runs entirely within the
+ current process.
+ """
+
+ __slots__ = ("input_files", "scheduler",) + \
+ ("pid", "registered", "_reg_ids", "_read_data")
+
+ def _start(self):
+ self._reg_ids = set()
+ self._read_data = []
+ for k, f in self.input_files.iteritems():
+ fcntl.fcntl(f.fileno(), fcntl.F_SETFL,
+ fcntl.fcntl(f.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
+ self._reg_ids.add(self.scheduler.register(f.fileno(),
+ PollConstants.POLLIN, self._output_handler))
+ self.registered = True
+
+ def isAlive(self):
+ return self.registered
+
+ def _wait(self):
+ if self.returncode is not None:
+ return self.returncode
+ if self.registered:
+ self.scheduler.schedule(self._reg_ids)
+ self.returncode = os.EX_OK
+ return self.returncode
+
+ def getvalue(self):
+ """Retrieve the entire contents"""
+ return "".join(self._read_data)
+
+ def close(self):
+ """Free the memory buffer."""
+ self._read_data = None
+
+ def _output_handler(self, fd, event):
+ files = self.input_files
+ for f in files.itervalues():
+ if fd == f.fileno():
+ break
+ self._read_data.append(f.read())
+ if not self._read_data[-1]:
+ for f in files.values():
+ f.close()
+ self.registered = False
+ for reg_id in self._reg_ids:
+ self.scheduler.unregister(reg_id)
+ self.wait()
+
+ return self.registered
class CompositeTask(AsynchronousTask):
@@ -1689,7 +1768,7 @@ class TaskSequence(CompositeTask):
def add(self, task):
self._task_queue.append(task)
- def start(self):
+ def _start(self):
self._start_next_task()
def cancel(self):
@@ -1802,7 +1881,7 @@ class SpawnProcess(SubProcess):
_files_dict = slot_dict_class(_file_names, prefix="")
_bufsize = 4096
- def start(self):
+ def _start(self):
if self.cancelled:
return
@@ -1930,7 +2009,7 @@ class EbuildFetcher(SpawnProcess):
__slots__ = ("fetchonly", "pkg",)
- def start(self):
+ def _start(self):
root_config = self.pkg.root_config
portdb = root_config.trees["porttree"].dbapi
@@ -1952,7 +2031,7 @@ class EbuildFetcher(SpawnProcess):
self.args = fetch_args
self.env = fetch_env
- SpawnProcess.start(self)
+ SpawnProcess._start(self)
class EbuildBuildDir(SlotObject):
@@ -2036,7 +2115,7 @@ class EbuildBuild(CompositeTask):
"prefetcher", "settings", "world_atom") + \
("_build_dir", "_buildpkg", "_ebuild_path", "_tree")
- def start(self):
+ def _start(self):
logger = self.logger
opts = self.opts
@@ -2266,7 +2345,7 @@ class EbuildExecuter(CompositeTask):
_phases = ("setup", "unpack", "compile", "test", "install")
- def start(self):
+ def _start(self):
pkg = self.pkg
scheduler = self.scheduler
tree = "porttree"
@@ -2323,7 +2402,7 @@ class EbuildMetadataPhase(SubProcess):
_bufsize = SpawnProcess._bufsize
_metadata_fd = 9
- def start(self):
+ def _start(self):
settings = self.settings
settings.reset()
ebuild_path = self.ebuild_path
@@ -2408,7 +2487,7 @@ class EbuildPhase(SubProcess):
_files_dict = slot_dict_class(_file_names, prefix="")
_bufsize = 4096
- def start(self):
+ def _start(self):
root_config = self.pkg.root_config
tree = self.tree
mydbapi = root_config.trees[tree].dbapi
@@ -2588,7 +2667,7 @@ class EbuildBinpkg(EbuildPhase):
"""
__slots__ = ("_binpkg_tmpfile",)
- def start(self):
+ def _start(self):
self.phase = "package"
self.tree = "porttree"
pkg = self.pkg
@@ -2607,7 +2686,7 @@ class EbuildBinpkg(EbuildPhase):
settings.backup_changes("PORTAGE_BINPKG_TMPFILE")
try:
- EbuildPhase.start(self)
+ EbuildPhase._start(self)
finally:
settings.pop("PORTAGE_BINPKG_TMPFILE", None)
@@ -2688,7 +2767,7 @@ class Binpkg(CompositeTask):
("_bintree", "_build_dir", "_ebuild_path", "_fetched_pkg",
"_image_dir", "_infloc", "_pkg_path", "_tree", "_verify")
- def start(self):
+ def _start(self):
pkg = self.pkg
settings = self.settings
@@ -2945,7 +3024,7 @@ class BinpkgFetcher(SpawnProcess):
pkg = self.pkg
self.pkg_path = pkg.root_config.trees["bintree"].getname(pkg.cpv)
- def start(self):
+ def _start(self):
if self.cancelled:
return
@@ -3004,7 +3083,7 @@ class BinpkgFetcher(SpawnProcess):
self.args = fetch_args
self.env = fetch_env
- SpawnProcess.start(self)
+ SpawnProcess._start(self)
def _set_returncode(self, wait_retval):
SpawnProcess._set_returncode(self, wait_retval)
@@ -3038,7 +3117,7 @@ class BinpkgFetcher(SpawnProcess):
class BinpkgVerifier(AsynchronousTask):
__slots__ = ("pkg",)
- def start(self):
+ def _start(self):
"""
Note: Unlike a normal AsynchronousTask.start() method,
this one does all work is synchronously. The returncode
@@ -3077,14 +3156,14 @@ class BinpkgExtractorAsync(SpawnProcess):
_shell_binary = portage.const.BASH_BINARY
- def start(self):
+ def _start(self):
self.args = [self._shell_binary, "-c",
"bzip2 -dqc -- %s | tar -xp -C %s -f -" % \
(portage._shell_quote(self.pkg_path),
portage._shell_quote(self.image_dir))]
self.env = self.pkg.root_config.settings.environ()
- SpawnProcess.start(self)
+ SpawnProcess._start(self)
class MergeListItem(CompositeTask):
@@ -3100,7 +3179,7 @@ class MergeListItem(CompositeTask):
"settings", "world_atom") + \
("_install_task",)
- def start(self):
+ def _start(self):
pkg = self.pkg
build_opts = self.build_opts
@@ -3213,7 +3292,7 @@ class PackageMerge(AsynchronousTask):
__slots__ = ("merge",)
- def start(self):
+ def _start(self):
self.returncode = self.merge.merge()
self.wait()
@@ -7729,7 +7808,8 @@ class PollSelectAdapter(PollConstants):
class SequentialTaskQueue(SlotObject):
- __slots__ = ("max_jobs", "running_tasks", "_task_queue", "_scheduling")
+ __slots__ = ("auto_schedule", "max_jobs", "running_tasks") + \
+ ("_task_queue", "_scheduling")
def __init__(self, **kwargs):
SlotObject.__init__(self, **kwargs)
@@ -7740,11 +7820,13 @@ class SequentialTaskQueue(SlotObject):
def add(self, task):
self._task_queue.append(task)
- self.schedule()
+ if self.auto_schedule:
+ self.schedule()
def addFront(self, task):
self._task_queue.appendleft(task)
- self.schedule()
+ if self.auto_schedule:
+ self.schedule()
def schedule(self):
@@ -7767,16 +7849,15 @@ class SequentialTaskQueue(SlotObject):
if hasattr(task, "registered") and task.registered:
continue
if task.poll() is not None:
- running_tasks.remove(task)
state_changed = True
while task_queue and (len(running_tasks) < max_jobs):
task = task_queue.popleft()
cancelled = getattr(task, "cancelled", None)
if not cancelled:
+ running_tasks.add(task)
task.addExitListener(self._task_exit)
task.start()
- running_tasks.add(task)
state_changed = True
self._scheduling = False
@@ -7784,7 +7865,9 @@ class SequentialTaskQueue(SlotObject):
return state_changed
def _task_exit(self, task):
- self.schedule()
+ self.running_tasks.remove(task)
+ if self.auto_schedule:
+ self.schedule()
def clear(self):
self._task_queue.clear()
@@ -7799,7 +7882,10 @@ class SequentialTaskQueue(SlotObject):
def __len__(self):
return len(self._task_queue) + len(self.running_tasks)
-class PollLoop(object):
+class PollScheduler(object):
+
+ class _sched_iface_class(SlotObject):
+ __slots__ = ("register", "schedule", "unregister")
def __init__(self):
self._max_jobs = 1
@@ -7814,15 +7900,18 @@ class PollLoop(object):
except AttributeError:
self._poll = PollSelectAdapter()
+ def _running_job_count(self):
+ return self._jobs
+
def _can_add_job(self):
- jobs = self._jobs
max_jobs = self._max_jobs
max_load = self._max_load
- if self._jobs >= self._max_jobs:
+ if self._running_job_count() >= self._max_jobs:
return False
- if max_load is not None and max_jobs > 1 and self._jobs > 1:
+ if max_load is not None and max_jobs > 1 and \
+ self._running_job_count() > 1:
try:
avg1, avg5, avg15 = os.getloadavg()
except OSError, e:
@@ -7872,7 +7961,7 @@ class PollLoop(object):
del self._poll_event_handlers[f]
del self._poll_event_handler_ids[reg_id]
- def _schedule(self, wait_id):
+ def _schedule(self, wait_ids):
"""
Schedule until wait_id is not longer registered
for poll() events.
@@ -7883,12 +7972,110 @@ class PollLoop(object):
handler_ids = self._poll_event_handler_ids
poll = self._poll.poll
- while wait_id in handler_ids:
+ if isinstance(wait_ids, int):
+ wait_ids = frozenset([wait_ids])
+
+ while wait_ids.intersection(handler_ids):
for f, event in poll():
handler, reg_id = event_handlers[f]
handler(f, event)
-class Scheduler(PollLoop):
+class QueueScheduler(PollScheduler):
+
+ """
+ Add instances of SequentialTaskQueue and then call run(). The
+ run() method returns when no tasks remain.
+ """
+
+ def __init__(self, max_jobs=None, max_load=None):
+ PollScheduler.__init__(self)
+
+ if max_jobs is None:
+ max_jobs = 1
+
+ self._max_jobs = max_jobs
+ self._max_load = max_load
+ self.sched_iface = self._sched_iface_class(
+ register=self._register,
+ schedule=self._schedule,
+ unregister=self._unregister)
+
+ self._queues = []
+ self._schedule_listeners = []
+
+ def add(self, q):
+ self._queues.append(q)
+
+ def remove(self, q):
+ self._queues.remove(q)
+
+ def run(self):
+
+ while self._schedule_tasks():
+ self._poll_loop()
+
+ while self._running_job_count():
+ self._poll_loop()
+
+ def _schedule_tasks(self):
+ """
+ @rtype: bool
+ @returns: True if there may be remaining tasks to schedule,
+ False otherwise.
+ """
+ while self._can_add_job():
+ n = self._max_jobs - self._running_job_count()
+ if n < 1:
+ break
+
+ if not self._start_next_job(n):
+ return False
+
+ for q in self._queues:
+ if q:
+ return True
+ return False
+
+ def _running_job_count(self):
+ job_count = 0
+ for q in self._queues:
+ job_count += len(q.running_tasks)
+ self._jobs = job_count
+ return job_count
+
+ def _start_next_job(self, n=1):
+ started_count = 0
+ for q in self._queues:
+ initial_job_count = len(q.running_tasks)
+ q.schedule()
+ final_job_count = len(q.running_tasks)
+ if final_job_count > initial_job_count:
+ started_count += (final_job_count - initial_job_count)
+ if started_count >= n:
+ break
+ return started_count
+
+class TaskScheduler(object):
+
+ """
+ A simple way to handle scheduling of AsynchrousTask instances. Simply
+ add tasks and call run(). The run() method returns when no tasks remain.
+ """
+
+ def __init__(self, max_jobs=None, max_load=None):
+ self._queue = SequentialTaskQueue(max_jobs=max_jobs)
+ self._scheduler = QueueScheduler(max_jobs=max_jobs, max_load=max_load)
+ self.sched_iface = self._scheduler.sched_iface
+ self.run = self._scheduler.run
+ self._scheduler.add(self._queue)
+
+ def add(self, task):
+ self._queue.add(task)
+
+ def run(self):
+ self._scheduler.schedule()
+
+class Scheduler(PollScheduler):
_opts_ignore_blockers = \
frozenset(["--buildpkgonly",
@@ -7930,7 +8117,7 @@ class Scheduler(PollLoop):
def __init__(self, settings, trees, mtimedb, myopts,
spinner, mergelist, favorites, digraph):
- PollLoop.__init__(self)
+ PollScheduler.__init__(self)
self.settings = settings
self.target_root = settings["ROOT"]
self.trees = trees
@@ -7970,7 +8157,8 @@ class Scheduler(PollLoop):
self._task_queues = self._task_queues_class()
for k in self._task_queues.allowed_keys:
- setattr(self._task_queues, k, SequentialTaskQueue())
+ setattr(self._task_queues, k,
+ SequentialTaskQueue(auto_schedule=True))
self._prefetchers = weakref.WeakValueDictionary()
self._pkg_queue = []
@@ -8738,13 +8926,10 @@ class Scheduler(PollLoop):
return pkg
-class MetadataRegen(PollLoop):
-
- class _sched_iface_class(SlotObject):
- __slots__ = ("register", "schedule", "unregister")
+class MetadataRegen(PollScheduler):
def __init__(self, portdb, max_jobs=None, max_load=None):
- PollLoop.__init__(self)
+ PollScheduler.__init__(self)
self._portdb = portdb
if max_jobs is None:
diff --git a/pym/portage/tests/process/__init__.py b/pym/portage/tests/process/__init__.py
new file mode 100644
index 000000000..a4a87a461
--- /dev/null
+++ b/pym/portage/tests/process/__init__.py
@@ -0,0 +1,3 @@
+# Copyright 1998-2008 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+# $Id: __init__.py 6870 2007-06-19 07:22:18Z zmedico $
diff --git a/pym/portage/tests/process/__test__ b/pym/portage/tests/process/__test__
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/pym/portage/tests/process/__test__
diff --git a/pym/portage/tests/process/test_poll.py b/pym/portage/tests/process/test_poll.py
new file mode 100644
index 000000000..45c0ad03e
--- /dev/null
+++ b/pym/portage/tests/process/test_poll.py
@@ -0,0 +1,43 @@
+# Copyright 1998-2008 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+# $Id: test_spawn.py 8474 2007-11-09 03:35:38Z zmedico $
+
+import errno, os, sys
+from portage.tests import TestCase
+from _emerge import PipeReader, SpawnProcess, TaskScheduler
+
+class PollTestCase(TestCase):
+
+ def testPipeReader(self):
+ """
+ Use a poll loop to read data from a pipe and assert that
+ the data written to the pipe is identical to the data
+ read from the pipe.
+ """
+
+ test_string = 2 * "blah blah blah\n"
+
+ master_fd, slave_fd = os.pipe()
+ master_file = os.fdopen(master_fd, 'r')
+
+ task_scheduler = TaskScheduler(max_jobs=2)
+ scheduler = task_scheduler.sched_iface
+
+ producer = SpawnProcess(
+ args=["bash", "-c", "echo -n '%s'" % test_string],
+ fd_pipes={1:slave_fd}, scheduler=scheduler)
+
+ consumer = PipeReader(
+ input_files={"producer" : master_file},
+ scheduler=scheduler)
+
+ task_scheduler.add(producer)
+ task_scheduler.add(consumer)
+
+ def producer_start_cb(task):
+ os.close(slave_fd)
+
+ producer.addStartListener(producer_start_cb)
+ task_scheduler.run()
+
+ self.assertEqual(test_string, consumer.getvalue())