From 378a30a81042fdc76206b3d183e3403d7214e094 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Wed, 9 Jul 2008 12:33:08 +0000 Subject: * Unify the scheduler's "build" and "extract" queues into a single "jobs" queue. * Add support for logging fetches to /var/log/emerge-fetch.log when --jobs is enabled. Previously this log was only used for the parallel-fetch feature but now it's also used for --jobs. The scheduler's "prefetch" queue has been renamed to "fetch" since it's not exclusively used for parallel-fetch anymore. * Pass the "background" parameter from Binpkg in to the BinpkgFetcher instance, to send output to emerge-fetch.log instead of stdout. svn path=/main/trunk/; revision=10999 --- pym/_emerge/__init__.py | 45 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index a696e5c2b..c74b5f12e 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -2541,7 +2541,8 @@ class Binpkg(CompositeTask): pkg = self.pkg pkg_count = self.pkg_count fetcher = BinpkgFetcher(background=self.background, - pkg=self.pkg, scheduler=self.scheduler) + logfile=self.scheduler.fetch.log_file, pkg=self.pkg, + scheduler=self.scheduler) pkg_path = fetcher.pkg_path self._pkg_path = pkg_path @@ -2553,7 +2554,12 @@ class Binpkg(CompositeTask): (pkg_count.curval, pkg_count.maxval, pkg.cpv) self.logger.log(msg, short_msg=short_msg) - self._start_task(fetcher, self._fetcher_exit) + if self.background: + fetcher.addExitListener(self._fetcher_exit) + self._current_task = fetcher + self.scheduler.fetch.schedule(fetcher) + else: + self._start_task(fetcher, self._fetcher_exit) return self._fetcher_exit(fetcher) @@ -7530,6 +7536,9 @@ class SequentialTaskQueue(SlotObject): def add(self, task): self._task_queue.append(task) + def addFront(self, task): + self._task_queue.appendleft(task) + def schedule(self): if not self: @@ -7586,10 +7595,13 @@ class Scheduler(object): _fetch_log = "/var/log/emerge-fetch.log" class _iface_class(SlotObject): - __slots__ = ("register", "schedule") + __slots__ = ("fetch", "register", "schedule") + + class _fetch_iface_class(SlotObject): + __slots__ = ("log_file", "schedule") _task_queues_class = slot_dict_class( - ("build", "extract", "merge", "prefetch",), prefix="") + ("merge", "jobs", "fetch",), prefix="") class _build_opts_class(SlotObject): __slots__ = ("buildpkg", "buildpkgonly", @@ -7640,8 +7652,11 @@ class Scheduler(object): self.curval = 0 self._logger = self._emerge_log_class( xterm_titles=("notitles" not in settings.features)) + fetch_iface = self._fetch_iface_class(log_file=self._fetch_log, + schedule=self._schedule_fetch) self._sched_iface = self._iface_class( - register=self._register, schedule=self._schedule) + fetch=fetch_iface, register=self._register, + schedule=self._schedule) self._poll_event_handlers = {} self._poll_event_handler_ids = {} # Increment id for each new handler. @@ -7656,7 +7671,6 @@ class Scheduler(object): for k in self._task_queues.allowed_keys: setattr(self._task_queues, k, SequentialTaskQueue()) - self._add_task = self._task_queues.prefetch.add self._prefetchers = weakref.WeakValueDictionary() self._pkg_queue = [] self._completed_tasks = set() @@ -7672,6 +7686,7 @@ class Scheduler(object): if max_jobs is None: max_jobs = 1 self._set_max_jobs(max_jobs) + background = self._max_jobs > 1 self._max_load = myopts.get("--load-average") @@ -7695,6 +7710,7 @@ class Scheduler(object): elif len(mergelist) > 1: self._parallel_fetch = True + if background or self._parallel_fetch: # clear out existing fetch log if it exists try: open(self._fetch_log, 'w') @@ -7703,7 +7719,7 @@ class Scheduler(object): def _set_max_jobs(self, max_jobs): self._max_jobs = max_jobs - self._task_queues.build.max_jobs = max_jobs + self._task_queues.jobs.max_jobs = max_jobs def _set_digraph(self, digraph): if self._max_jobs < 2: @@ -7769,6 +7785,13 @@ class Scheduler(object): if pargs: self.status = pargs[0] + def _schedule_fetch(self, fetcher): + """ + Schedule a fetcher on the fetch queue, in order to + serialize access to the fetch log. + """ + self._task_queues.fetch.addFront(fetcher) + def _find_blockers(self, new_pkg): """ Returns a callable which should be called only when @@ -7855,7 +7878,7 @@ class Scheduler(object): for pkg in self._mergelist: prefetcher = self._create_prefetcher(pkg) if prefetcher is not None: - self._add_task(prefetcher) + self._task_queues.fetch.add(prefetcher) prefetchers[pkg] = prefetcher def _create_prefetcher(self, pkg): @@ -8116,7 +8139,7 @@ class Scheduler(object): del pkg_queue[:] self._completed_tasks.clear() self._digraph = None - self._task_queues.prefetch.clear() + self._task_queues.fetch.clear() # discard any failures and return the # exist status of the last one @@ -8239,10 +8262,10 @@ class Scheduler(object): task_queues.merge.add(merge) elif pkg.built: task.addExitListener(self._extract_exit) - task_queues.extract.add(task) + task_queues.jobs.add(task) else: task.addExitListener(self._build_exit) - task_queues.build.add(task) + task_queues.jobs.add(task) while self._jobs: self._schedule_main(wait=True) -- cgit v1.2.3-1-g7c22