diff options
-rw-r--r-- | pym/_emerge/__init__.py | 267 | ||||
-rw-r--r-- | pym/portage/tests/process/__init__.py | 3 | ||||
-rw-r--r-- | pym/portage/tests/process/__test__ | 0 | ||||
-rw-r--r-- | pym/portage/tests/process/test_poll.py | 43 |
4 files changed, 272 insertions, 41 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index 38f485aae..6f975449a 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -1498,13 +1498,17 @@ class AsynchronousTask(SlotObject): """ __slots__ = ("background", "cancelled", "returncode") + \ - ("_exit_listeners",) + ("_exit_listeners", "_start_listeners") def start(self): """ Start an asynchronous task and then return as soon as possible. """ - pass + self._start() + self._start_hook() + + def _start(self): + raise NotImplementedError(self) def isAlive(self): return self.returncode is None @@ -1529,6 +1533,25 @@ class AsynchronousTask(SlotObject): self.cancelled = True self.wait() + def addStartListener(self, f): + """ + The function will be called with one argument, a reference to self. + """ + if self._start_listeners is None: + self._start_listeners = [] + self._start_listeners.append(f) + + def removeStartListener(self, f): + self._start_listeners.remove(f) + + def _start_hook(self): + if self._start_listeners is not None: + start_listeners = self._start_listeners + self._start_listeners = None + + for f in start_listeners: + f(self) + def addExitListener(self, f): """ The function will be called with one argument, a reference to self. @@ -1558,7 +1581,63 @@ class AsynchronousTask(SlotObject): for f in exit_listeners: f(self) - self._exit_listeners = None + +class PipeReader(AsynchronousTask): + + """ + Reads output from one or more files and saves it in memory, + for retrieval via the getvalue() method. This is driven by + the scheduler's poll() loop, so it runs entirely within the + current process. + """ + + __slots__ = ("input_files", "scheduler",) + \ + ("pid", "registered", "_reg_ids", "_read_data") + + def _start(self): + self._reg_ids = set() + self._read_data = [] + for k, f in self.input_files.iteritems(): + fcntl.fcntl(f.fileno(), fcntl.F_SETFL, + fcntl.fcntl(f.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) + self._reg_ids.add(self.scheduler.register(f.fileno(), + PollConstants.POLLIN, self._output_handler)) + self.registered = True + + def isAlive(self): + return self.registered + + def _wait(self): + if self.returncode is not None: + return self.returncode + if self.registered: + self.scheduler.schedule(self._reg_ids) + self.returncode = os.EX_OK + return self.returncode + + def getvalue(self): + """Retrieve the entire contents""" + return "".join(self._read_data) + + def close(self): + """Free the memory buffer.""" + self._read_data = None + + def _output_handler(self, fd, event): + files = self.input_files + for f in files.itervalues(): + if fd == f.fileno(): + break + self._read_data.append(f.read()) + if not self._read_data[-1]: + for f in files.values(): + f.close() + self.registered = False + for reg_id in self._reg_ids: + self.scheduler.unregister(reg_id) + self.wait() + + return self.registered class CompositeTask(AsynchronousTask): @@ -1689,7 +1768,7 @@ class TaskSequence(CompositeTask): def add(self, task): self._task_queue.append(task) - def start(self): + def _start(self): self._start_next_task() def cancel(self): @@ -1802,7 +1881,7 @@ class SpawnProcess(SubProcess): _files_dict = slot_dict_class(_file_names, prefix="") _bufsize = 4096 - def start(self): + def _start(self): if self.cancelled: return @@ -1930,7 +2009,7 @@ class EbuildFetcher(SpawnProcess): __slots__ = ("fetchonly", "pkg",) - def start(self): + def _start(self): root_config = self.pkg.root_config portdb = root_config.trees["porttree"].dbapi @@ -1952,7 +2031,7 @@ class EbuildFetcher(SpawnProcess): self.args = fetch_args self.env = fetch_env - SpawnProcess.start(self) + SpawnProcess._start(self) class EbuildBuildDir(SlotObject): @@ -2036,7 +2115,7 @@ class EbuildBuild(CompositeTask): "prefetcher", "settings", "world_atom") + \ ("_build_dir", "_buildpkg", "_ebuild_path", "_tree") - def start(self): + def _start(self): logger = self.logger opts = self.opts @@ -2266,7 +2345,7 @@ class EbuildExecuter(CompositeTask): _phases = ("setup", "unpack", "compile", "test", "install") - def start(self): + def _start(self): pkg = self.pkg scheduler = self.scheduler tree = "porttree" @@ -2323,7 +2402,7 @@ class EbuildMetadataPhase(SubProcess): _bufsize = SpawnProcess._bufsize _metadata_fd = 9 - def start(self): + def _start(self): settings = self.settings settings.reset() ebuild_path = self.ebuild_path @@ -2408,7 +2487,7 @@ class EbuildPhase(SubProcess): _files_dict = slot_dict_class(_file_names, prefix="") _bufsize = 4096 - def start(self): + def _start(self): root_config = self.pkg.root_config tree = self.tree mydbapi = root_config.trees[tree].dbapi @@ -2588,7 +2667,7 @@ class EbuildBinpkg(EbuildPhase): """ __slots__ = ("_binpkg_tmpfile",) - def start(self): + def _start(self): self.phase = "package" self.tree = "porttree" pkg = self.pkg @@ -2607,7 +2686,7 @@ class EbuildBinpkg(EbuildPhase): settings.backup_changes("PORTAGE_BINPKG_TMPFILE") try: - EbuildPhase.start(self) + EbuildPhase._start(self) finally: settings.pop("PORTAGE_BINPKG_TMPFILE", None) @@ -2688,7 +2767,7 @@ class Binpkg(CompositeTask): ("_bintree", "_build_dir", "_ebuild_path", "_fetched_pkg", "_image_dir", "_infloc", "_pkg_path", "_tree", "_verify") - def start(self): + def _start(self): pkg = self.pkg settings = self.settings @@ -2945,7 +3024,7 @@ class BinpkgFetcher(SpawnProcess): pkg = self.pkg self.pkg_path = pkg.root_config.trees["bintree"].getname(pkg.cpv) - def start(self): + def _start(self): if self.cancelled: return @@ -3004,7 +3083,7 @@ class BinpkgFetcher(SpawnProcess): self.args = fetch_args self.env = fetch_env - SpawnProcess.start(self) + SpawnProcess._start(self) def _set_returncode(self, wait_retval): SpawnProcess._set_returncode(self, wait_retval) @@ -3038,7 +3117,7 @@ class BinpkgFetcher(SpawnProcess): class BinpkgVerifier(AsynchronousTask): __slots__ = ("pkg",) - def start(self): + def _start(self): """ Note: Unlike a normal AsynchronousTask.start() method, this one does all work is synchronously. The returncode @@ -3077,14 +3156,14 @@ class BinpkgExtractorAsync(SpawnProcess): _shell_binary = portage.const.BASH_BINARY - def start(self): + def _start(self): self.args = [self._shell_binary, "-c", "bzip2 -dqc -- %s | tar -xp -C %s -f -" % \ (portage._shell_quote(self.pkg_path), portage._shell_quote(self.image_dir))] self.env = self.pkg.root_config.settings.environ() - SpawnProcess.start(self) + SpawnProcess._start(self) class MergeListItem(CompositeTask): @@ -3100,7 +3179,7 @@ class MergeListItem(CompositeTask): "settings", "world_atom") + \ ("_install_task",) - def start(self): + def _start(self): pkg = self.pkg build_opts = self.build_opts @@ -3213,7 +3292,7 @@ class PackageMerge(AsynchronousTask): __slots__ = ("merge",) - def start(self): + def _start(self): self.returncode = self.merge.merge() self.wait() @@ -7729,7 +7808,8 @@ class PollSelectAdapter(PollConstants): class SequentialTaskQueue(SlotObject): - __slots__ = ("max_jobs", "running_tasks", "_task_queue", "_scheduling") + __slots__ = ("auto_schedule", "max_jobs", "running_tasks") + \ + ("_task_queue", "_scheduling") def __init__(self, **kwargs): SlotObject.__init__(self, **kwargs) @@ -7740,11 +7820,13 @@ class SequentialTaskQueue(SlotObject): def add(self, task): self._task_queue.append(task) - self.schedule() + if self.auto_schedule: + self.schedule() def addFront(self, task): self._task_queue.appendleft(task) - self.schedule() + if self.auto_schedule: + self.schedule() def schedule(self): @@ -7767,16 +7849,15 @@ class SequentialTaskQueue(SlotObject): if hasattr(task, "registered") and task.registered: continue if task.poll() is not None: - running_tasks.remove(task) state_changed = True while task_queue and (len(running_tasks) < max_jobs): task = task_queue.popleft() cancelled = getattr(task, "cancelled", None) if not cancelled: + running_tasks.add(task) task.addExitListener(self._task_exit) task.start() - running_tasks.add(task) state_changed = True self._scheduling = False @@ -7784,7 +7865,9 @@ class SequentialTaskQueue(SlotObject): return state_changed def _task_exit(self, task): - self.schedule() + self.running_tasks.remove(task) + if self.auto_schedule: + self.schedule() def clear(self): self._task_queue.clear() @@ -7799,7 +7882,10 @@ class SequentialTaskQueue(SlotObject): def __len__(self): return len(self._task_queue) + len(self.running_tasks) -class PollLoop(object): +class PollScheduler(object): + + class _sched_iface_class(SlotObject): + __slots__ = ("register", "schedule", "unregister") def __init__(self): self._max_jobs = 1 @@ -7814,15 +7900,18 @@ class PollLoop(object): except AttributeError: self._poll = PollSelectAdapter() + def _running_job_count(self): + return self._jobs + def _can_add_job(self): - jobs = self._jobs max_jobs = self._max_jobs max_load = self._max_load - if self._jobs >= self._max_jobs: + if self._running_job_count() >= self._max_jobs: return False - if max_load is not None and max_jobs > 1 and self._jobs > 1: + if max_load is not None and max_jobs > 1 and \ + self._running_job_count() > 1: try: avg1, avg5, avg15 = os.getloadavg() except OSError, e: @@ -7872,7 +7961,7 @@ class PollLoop(object): del self._poll_event_handlers[f] del self._poll_event_handler_ids[reg_id] - def _schedule(self, wait_id): + def _schedule(self, wait_ids): """ Schedule until wait_id is not longer registered for poll() events. @@ -7883,12 +7972,110 @@ class PollLoop(object): handler_ids = self._poll_event_handler_ids poll = self._poll.poll - while wait_id in handler_ids: + if isinstance(wait_ids, int): + wait_ids = frozenset([wait_ids]) + + while wait_ids.intersection(handler_ids): for f, event in poll(): handler, reg_id = event_handlers[f] handler(f, event) -class Scheduler(PollLoop): +class QueueScheduler(PollScheduler): + + """ + Add instances of SequentialTaskQueue and then call run(). The + run() method returns when no tasks remain. + """ + + def __init__(self, max_jobs=None, max_load=None): + PollScheduler.__init__(self) + + if max_jobs is None: + max_jobs = 1 + + self._max_jobs = max_jobs + self._max_load = max_load + self.sched_iface = self._sched_iface_class( + register=self._register, + schedule=self._schedule, + unregister=self._unregister) + + self._queues = [] + self._schedule_listeners = [] + + def add(self, q): + self._queues.append(q) + + def remove(self, q): + self._queues.remove(q) + + def run(self): + + while self._schedule_tasks(): + self._poll_loop() + + while self._running_job_count(): + self._poll_loop() + + def _schedule_tasks(self): + """ + @rtype: bool + @returns: True if there may be remaining tasks to schedule, + False otherwise. + """ + while self._can_add_job(): + n = self._max_jobs - self._running_job_count() + if n < 1: + break + + if not self._start_next_job(n): + return False + + for q in self._queues: + if q: + return True + return False + + def _running_job_count(self): + job_count = 0 + for q in self._queues: + job_count += len(q.running_tasks) + self._jobs = job_count + return job_count + + def _start_next_job(self, n=1): + started_count = 0 + for q in self._queues: + initial_job_count = len(q.running_tasks) + q.schedule() + final_job_count = len(q.running_tasks) + if final_job_count > initial_job_count: + started_count += (final_job_count - initial_job_count) + if started_count >= n: + break + return started_count + +class TaskScheduler(object): + + """ + A simple way to handle scheduling of AsynchrousTask instances. Simply + add tasks and call run(). The run() method returns when no tasks remain. + """ + + def __init__(self, max_jobs=None, max_load=None): + self._queue = SequentialTaskQueue(max_jobs=max_jobs) + self._scheduler = QueueScheduler(max_jobs=max_jobs, max_load=max_load) + self.sched_iface = self._scheduler.sched_iface + self.run = self._scheduler.run + self._scheduler.add(self._queue) + + def add(self, task): + self._queue.add(task) + + def run(self): + self._scheduler.schedule() + +class Scheduler(PollScheduler): _opts_ignore_blockers = \ frozenset(["--buildpkgonly", @@ -7930,7 +8117,7 @@ class Scheduler(PollLoop): def __init__(self, settings, trees, mtimedb, myopts, spinner, mergelist, favorites, digraph): - PollLoop.__init__(self) + PollScheduler.__init__(self) self.settings = settings self.target_root = settings["ROOT"] self.trees = trees @@ -7970,7 +8157,8 @@ class Scheduler(PollLoop): self._task_queues = self._task_queues_class() for k in self._task_queues.allowed_keys: - setattr(self._task_queues, k, SequentialTaskQueue()) + setattr(self._task_queues, k, + SequentialTaskQueue(auto_schedule=True)) self._prefetchers = weakref.WeakValueDictionary() self._pkg_queue = [] @@ -8738,13 +8926,10 @@ class Scheduler(PollLoop): return pkg -class MetadataRegen(PollLoop): - - class _sched_iface_class(SlotObject): - __slots__ = ("register", "schedule", "unregister") +class MetadataRegen(PollScheduler): def __init__(self, portdb, max_jobs=None, max_load=None): - PollLoop.__init__(self) + PollScheduler.__init__(self) self._portdb = portdb if max_jobs is None: diff --git a/pym/portage/tests/process/__init__.py b/pym/portage/tests/process/__init__.py new file mode 100644 index 000000000..a4a87a461 --- /dev/null +++ b/pym/portage/tests/process/__init__.py @@ -0,0 +1,3 @@ +# Copyright 1998-2008 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 +# $Id: __init__.py 6870 2007-06-19 07:22:18Z zmedico $ diff --git a/pym/portage/tests/process/__test__ b/pym/portage/tests/process/__test__ new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/pym/portage/tests/process/__test__ diff --git a/pym/portage/tests/process/test_poll.py b/pym/portage/tests/process/test_poll.py new file mode 100644 index 000000000..45c0ad03e --- /dev/null +++ b/pym/portage/tests/process/test_poll.py @@ -0,0 +1,43 @@ +# Copyright 1998-2008 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 +# $Id: test_spawn.py 8474 2007-11-09 03:35:38Z zmedico $ + +import errno, os, sys +from portage.tests import TestCase +from _emerge import PipeReader, SpawnProcess, TaskScheduler + +class PollTestCase(TestCase): + + def testPipeReader(self): + """ + Use a poll loop to read data from a pipe and assert that + the data written to the pipe is identical to the data + read from the pipe. + """ + + test_string = 2 * "blah blah blah\n" + + master_fd, slave_fd = os.pipe() + master_file = os.fdopen(master_fd, 'r') + + task_scheduler = TaskScheduler(max_jobs=2) + scheduler = task_scheduler.sched_iface + + producer = SpawnProcess( + args=["bash", "-c", "echo -n '%s'" % test_string], + fd_pipes={1:slave_fd}, scheduler=scheduler) + + consumer = PipeReader( + input_files={"producer" : master_file}, + scheduler=scheduler) + + task_scheduler.add(producer) + task_scheduler.add(consumer) + + def producer_start_cb(task): + os.close(slave_fd) + + producer.addStartListener(producer_start_cb) + task_scheduler.run() + + self.assertEqual(test_string, consumer.getvalue()) |