From e4b64dd7dc7c2217055f110990b2496b71976681 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Fri, 5 Oct 2012 13:48:53 -0700 Subject: TaskScheduler: inherit AsyncScheduler This allows the QueueScheduler class to be eliminated. --- pym/_emerge/FifoIpcDaemon.py | 2 + pym/_emerge/QueueScheduler.py | 105 ---------------------------- pym/_emerge/TaskScheduler.py | 26 ------- pym/portage/tests/ebuild/test_ipc_daemon.py | 58 +++++++++------ pym/portage/tests/process/test_poll.py | 17 +++-- pym/portage/util/_async/TaskScheduler.py | 22 ++++++ 6 files changed, 68 insertions(+), 162 deletions(-) delete mode 100644 pym/_emerge/QueueScheduler.py delete mode 100644 pym/_emerge/TaskScheduler.py create mode 100644 pym/portage/util/_async/TaskScheduler.py diff --git a/pym/_emerge/FifoIpcDaemon.py b/pym/_emerge/FifoIpcDaemon.py index fcc4ab4b9..de9dc67b1 100644 --- a/pym/_emerge/FifoIpcDaemon.py +++ b/pym/_emerge/FifoIpcDaemon.py @@ -47,6 +47,8 @@ class FifoIpcDaemon(AbstractPollTask): if self.returncode is None: self.returncode = 1 self._unregister() + # notify exit listeners + self.wait() def _wait(self): if self.returncode is not None: diff --git a/pym/_emerge/QueueScheduler.py b/pym/_emerge/QueueScheduler.py deleted file mode 100644 index 206087c7a..000000000 --- a/pym/_emerge/QueueScheduler.py +++ /dev/null @@ -1,105 +0,0 @@ -# Copyright 1999-2012 Gentoo Foundation -# Distributed under the terms of the GNU General Public License v2 - -from _emerge.PollScheduler import PollScheduler - -class QueueScheduler(PollScheduler): - - """ - Add instances of SequentialTaskQueue and then call run(). The - run() method returns when no tasks remain. - """ - - def __init__(self, main=True, max_jobs=None, max_load=None): - PollScheduler.__init__(self, main=main) - - if max_jobs is None: - max_jobs = 1 - - self._max_jobs = max_jobs - self._max_load = max_load - - self._queues = [] - self._schedule_listeners = [] - - def add(self, q): - self._queues.append(q) - - def remove(self, q): - self._queues.remove(q) - - def clear(self): - for q in self._queues: - q.clear() - - def run(self, timeout=None): - - timeout_callback = None - if timeout is not None: - def timeout_callback(): - timeout_callback.timed_out = True - return False - timeout_callback.timed_out = False - timeout_callback.timeout_id = self.sched_iface.timeout_add( - timeout, timeout_callback) - - term_check_id = self.sched_iface.idle_add(self._termination_check) - try: - while not (timeout_callback is not None and - timeout_callback.timed_out): - # We don't have any callbacks to trigger _schedule(), - # so we have to call it explicitly here. - self._schedule() - if self._keep_scheduling(): - self.sched_iface.iteration() - else: - break - - while self._is_work_scheduled() and \ - not (timeout_callback is not None and - timeout_callback.timed_out): - self.sched_iface.iteration() - finally: - self.sched_iface.source_remove(term_check_id) - if timeout_callback is not None: - self.sched_iface.unregister(timeout_callback.timeout_id) - - def _schedule_tasks(self): - """ - @rtype: bool - @return: True if there may be remaining tasks to schedule, - False otherwise. - """ - if self._terminated_tasks: - return - - while self._can_add_job(): - n = self._max_jobs - self._running_job_count() - if n < 1: - break - - if not self._start_next_job(n): - return - - def _keep_scheduling(self): - return not self._terminated_tasks and any(self._queues) - - def _running_job_count(self): - job_count = 0 - for q in self._queues: - job_count += len(q.running_tasks) - self._jobs = job_count - return job_count - - def _start_next_job(self, n=1): - started_count = 0 - for q in self._queues: - initial_job_count = len(q.running_tasks) - q.schedule() - final_job_count = len(q.running_tasks) - if final_job_count > initial_job_count: - started_count += (final_job_count - initial_job_count) - if started_count >= n: - break - return started_count - diff --git a/pym/_emerge/TaskScheduler.py b/pym/_emerge/TaskScheduler.py deleted file mode 100644 index 583bfe323..000000000 --- a/pym/_emerge/TaskScheduler.py +++ /dev/null @@ -1,26 +0,0 @@ -# Copyright 1999-2012 Gentoo Foundation -# Distributed under the terms of the GNU General Public License v2 - -from _emerge.QueueScheduler import QueueScheduler -from _emerge.SequentialTaskQueue import SequentialTaskQueue - -class TaskScheduler(object): - - """ - A simple way to handle scheduling of AsynchrousTask instances. Simply - add tasks and call run(). The run() method returns when no tasks remain. - """ - - def __init__(self, main=True, max_jobs=None, max_load=None): - self._queue = SequentialTaskQueue(max_jobs=max_jobs) - self._scheduler = QueueScheduler(main=main, - max_jobs=max_jobs, max_load=max_load) - self.sched_iface = self._scheduler.sched_iface - self.run = self._scheduler.run - self.clear = self._scheduler.clear - self.wait = self._queue.wait - self._scheduler.add(self._queue) - - def add(self, task): - self._queue.add(task) - diff --git a/pym/portage/tests/ebuild/test_ipc_daemon.py b/pym/portage/tests/ebuild/test_ipc_daemon.py index 77277fe8e..d4328a1d6 100644 --- a/pym/portage/tests/ebuild/test_ipc_daemon.py +++ b/pym/portage/tests/ebuild/test_ipc_daemon.py @@ -1,4 +1,4 @@ -# Copyright 2010-2011 Gentoo Foundation +# Copyright 2010-2012 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 import tempfile @@ -14,10 +14,12 @@ from portage.locks import hardlock_cleanup from portage.package.ebuild._ipc.ExitCommand import ExitCommand from portage.util import ensure_dirs from portage.util._async.ForkProcess import ForkProcess +from portage.util._async.TaskScheduler import TaskScheduler +from portage.util._eventloop.global_event_loop import global_event_loop +from _emerge.PollScheduler import PollScheduler from _emerge.SpawnProcess import SpawnProcess from _emerge.EbuildBuildDir import EbuildBuildDir from _emerge.EbuildIpcDaemon import EbuildIpcDaemon -from _emerge.TaskScheduler import TaskScheduler class SleepProcess(ForkProcess): """ @@ -33,6 +35,7 @@ class IpcDaemonTestCase(TestCase): _SCHEDULE_TIMEOUT = 40000 # 40 seconds def testIpcDaemon(self): + event_loop = global_event_loop() tmpdir = tempfile.mkdtemp() build_dir = None try: @@ -54,9 +57,8 @@ class IpcDaemonTestCase(TestCase): env["__PORTAGE_TEST_HARDLINK_LOCKS"] = \ os.environ["__PORTAGE_TEST_HARDLINK_LOCKS"] - task_scheduler = TaskScheduler(max_jobs=2) build_dir = EbuildBuildDir( - scheduler=task_scheduler.sched_iface, + scheduler=PollScheduler(event_loop=event_loop).sched_iface, settings=env) build_dir.lock() ensure_dirs(env['PORTAGE_BUILDDIR']) @@ -71,26 +73,23 @@ class IpcDaemonTestCase(TestCase): commands = {'exit' : exit_command} daemon = EbuildIpcDaemon(commands=commands, input_fifo=input_fifo, - output_fifo=output_fifo, - scheduler=task_scheduler.sched_iface) + output_fifo=output_fifo) proc = SpawnProcess( args=[BASH_BINARY, "-c", '"$PORTAGE_BIN_PATH"/ebuild-ipc exit %d' % exitcode], - env=env, scheduler=task_scheduler.sched_iface) + env=env) + task_scheduler = TaskScheduler(iter([daemon, proc]), + max_jobs=2, event_loop=event_loop) self.received_command = False def exit_command_callback(): self.received_command = True - task_scheduler.clear() - task_scheduler.wait() + task_scheduler.cancel() exit_command.reply_hook = exit_command_callback start_time = time.time() - task_scheduler.add(daemon) - task_scheduler.add(proc) - task_scheduler.run(timeout=self._SCHEDULE_TIMEOUT) - task_scheduler.clear() - task_scheduler.wait() + self._run(event_loop, task_scheduler, self._SCHEDULE_TIMEOUT) + hardlock_cleanup(env['PORTAGE_BUILDDIR'], remove_all_locks=True) @@ -101,7 +100,7 @@ class IpcDaemonTestCase(TestCase): self.assertEqual(daemon.isAlive(), False) self.assertEqual(exit_command.exitcode, exitcode) - # Intentionally short timeout test for QueueScheduler.run() + # Intentionally short timeout test for EventLoop/AsyncScheduler. # Use a ridiculously long sleep_time_s in case the user's # system is heavily loaded (see bug #436334). sleep_time_s = 600 #600.000 seconds @@ -116,20 +115,18 @@ class IpcDaemonTestCase(TestCase): scheduler=task_scheduler.sched_iface) proc = SleepProcess(seconds=sleep_time_s, scheduler=task_scheduler.sched_iface) + task_scheduler = TaskScheduler(iter([daemon, proc]), + max_jobs=2, event_loop=event_loop) self.received_command = False def exit_command_callback(): self.received_command = True - task_scheduler.clear() - task_scheduler.wait() + task_scheduler.cancel() exit_command.reply_hook = exit_command_callback start_time = time.time() - task_scheduler.add(daemon) - task_scheduler.add(proc) - task_scheduler.run(timeout=short_timeout_ms) - task_scheduler.clear() - task_scheduler.wait() + self._run(event_loop, task_scheduler, short_timeout_ms) + hardlock_cleanup(env['PORTAGE_BUILDDIR'], remove_all_locks=True) @@ -144,3 +141,20 @@ class IpcDaemonTestCase(TestCase): if build_dir is not None: build_dir.unlock() shutil.rmtree(tmpdir) + + def _timeout_callback(self): + self._timed_out = True + + def _run(self, event_loop, task_scheduler, timeout): + self._timed_out = False + timeout_id = event_loop.timeout_add(timeout, self._timeout_callback) + + try: + task_scheduler.start() + while not self._timed_out and task_scheduler.poll() is None: + event_loop.iteration() + if self._timed_out: + task_scheduler.cancel() + task_scheduler.wait() + finally: + event_loop.source_remove(timeout_id) diff --git a/pym/portage/tests/process/test_poll.py b/pym/portage/tests/process/test_poll.py index d6667b4e0..3772d797f 100644 --- a/pym/portage/tests/process/test_poll.py +++ b/pym/portage/tests/process/test_poll.py @@ -4,7 +4,7 @@ from portage import os from portage.tests import TestCase from portage.util._pty import _create_pty_or_pipe -from _emerge.TaskScheduler import TaskScheduler +from portage.util._async.TaskScheduler import TaskScheduler from _emerge.PipeReader import PipeReader from _emerge.SpawnProcess import SpawnProcess @@ -37,25 +37,24 @@ class PipeReaderTestCase(TestCase): # in order to avoid issue 5380 with python3. master_file = os.fdopen(master_fd, 'rb', 0) slave_file = os.fdopen(slave_fd, 'wb', 0) - task_scheduler = TaskScheduler(max_jobs=2) producer = SpawnProcess( args=["bash", "-c", self._echo_cmd % test_string], - env=os.environ, fd_pipes={1:slave_fd}, - scheduler=task_scheduler.sched_iface) - task_scheduler.add(producer) - slave_file.close() + env=os.environ, fd_pipes={1:slave_fd}) consumer = PipeReader( input_files={"producer" : master_file}, - scheduler=task_scheduler.sched_iface, _use_array=self._use_array) + _use_array=self._use_array) - task_scheduler.add(consumer) + task_scheduler = TaskScheduler(iter([producer, consumer]), max_jobs=2) # This will ensure that both tasks have exited, which # is necessary to avoid "ResourceWarning: unclosed file" # warnings since Python 3.2 (and also ensures that we # don't leave any zombie child processes). - task_scheduler.run() + task_scheduler.start() + slave_file.close() + task_scheduler.wait() + self.assertEqual(producer.returncode, os.EX_OK) self.assertEqual(consumer.returncode, os.EX_OK) diff --git a/pym/portage/util/_async/TaskScheduler.py b/pym/portage/util/_async/TaskScheduler.py new file mode 100644 index 000000000..b0ec7af09 --- /dev/null +++ b/pym/portage/util/_async/TaskScheduler.py @@ -0,0 +1,22 @@ +# Copyright 2012 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +from .AsyncScheduler import AsyncScheduler + +class TaskScheduler(AsyncScheduler): + + __slots__ = ('_task_iter',) + + """ + A simple way to handle scheduling of AbstractPollTask instances. Simply + pass a task iterator into the constructor and call start(). Use the + poll, wait, or addExitListener methods to be notified when all of the + tasks have completed. + """ + + def __init__(self, task_iter, **kwargs): + AsyncScheduler.__init__(self, **kwargs) + self._task_iter = task_iter + + def _next_task(self): + return next(self._task_iter) -- cgit v1.2.3-1-g7c22