# Copyright 2012-2013 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 from portage import os from _emerge.AsynchronousTask import AsynchronousTask from _emerge.PollScheduler import PollScheduler class AsyncScheduler(AsynchronousTask, PollScheduler): def __init__(self, max_jobs=None, max_load=None, **kwargs): AsynchronousTask.__init__(self) PollScheduler.__init__(self, **kwargs) if max_jobs is None: max_jobs = 1 self._max_jobs = max_jobs self._max_load = max_load self._error_count = 0 self._running_tasks = set() self._remaining_tasks = True self._term_check_id = None self._loadavg_check_id = None def _poll(self): if not (self._is_work_scheduled() or self._keep_scheduling()): self.wait() return self.returncode def _cancel(self): self._terminated.set() self._termination_check() def _terminate_tasks(self): for task in list(self._running_tasks): task.cancel() def _next_task(self): raise NotImplementedError(self) def _keep_scheduling(self): return self._remaining_tasks and not self._terminated.is_set() def _running_job_count(self): return len(self._running_tasks) def _schedule_tasks(self): while self._keep_scheduling() and self._can_add_job(): try: task = self._next_task() except StopIteration: self._remaining_tasks = False else: self._running_tasks.add(task) task.scheduler = self._sched_iface task.addExitListener(self._task_exit) task.start() # Triggers cleanup and exit listeners if there's nothing left to do. self.poll() def _task_exit(self, task): self._running_tasks.discard(task) if task.returncode != os.EX_OK: self._error_count += 1 self._schedule() def _start(self): self._term_check_id = self._event_loop.idle_add(self._termination_check) if self._max_load is not None and \ self._loadavg_latency is not None and \ (self._max_jobs is True or self._max_jobs > 1): # We have to schedule periodically, in case the load # average has changed since the last call. self._loadavg_check_id = self._event_loop.timeout_add( self._loadavg_latency, self._schedule) self._schedule() def _wait(self): # Loop while there are jobs to be scheduled. while self._keep_scheduling(): self._event_loop.iteration() # Clean shutdown of previously scheduled jobs. In the # case of termination, this allows for basic cleanup # such as flushing of buffered output to logs. while self._is_work_scheduled(): self._event_loop.iteration() if self._term_check_id is not None: self._event_loop.source_remove(self._term_check_id) self._term_check_id = None if self._loadavg_check_id is not None: self._event_loop.source_remove(self._loadavg_check_id) self._loadavg_check_id = None if self._error_count > 0: self.returncode = 1 else: self.returncode = os.EX_OK return self.returncode