# Copyright 2013 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 import collections import errno import logging import stat import subprocess import sys import portage from portage import _encodings, _unicode_encode from portage import os from portage.exception import PortageException from portage.util._async.FileCopier import FileCopier from portage.util._async.FileDigester import FileDigester from portage.util._async.PipeLogger import PipeLogger from portage.util._async.PopenProcess import PopenProcess from _emerge.CompositeTask import CompositeTask default_hash_name = portage.const.MANIFEST2_REQUIRED_HASH default_fetchcommand = "wget -c -v -t 1 --passive-ftp --timeout=60 -O \"${DISTDIR}/${FILE}\" \"${URI}\"" class FetchTask(CompositeTask): __slots__ = ('distfile', 'digests', 'config', 'cpv', 'restrict', 'uri_tuple', '_current_mirror', '_current_stat', '_fetch_tmp_dir_info', '_fetch_tmp_file', '_fs_mirror_stack', '_mirror_stack', '_previously_added', '_primaryuri_stack', '_log_path', '_tried_uris') def _start(self): if self.config.options.fetch_log_dir is not None and \ not self.config.options.dry_run: self._log_path = os.path.join( self.config.options.fetch_log_dir, self.distfile + '.log') self._previously_added = True if self.config.distfiles_db is not None and \ self.distfile not in self.config.distfiles_db: self._previously_added = False self.config.distfiles_db[self.distfile] = self.cpv if not self._have_needed_digests(): msg = "incomplete digests: %s" % " ".join(self.digests) self.scheduler.output(msg, background=self.background, log_path=self._log_path) self.config.log_failure("%s\t%s\t%s" % (self.cpv, self.distfile, msg)) self.config.file_failures[self.distfile] = self.cpv self.returncode = os.EX_OK self._async_wait() return distfile_path = os.path.join( self.config.options.distfiles, self.distfile) st = None size_ok = False try: st = os.stat(distfile_path) except OSError as e: if e.errno not in (errno.ENOENT, errno.ESTALE): msg = "%s stat failed in %s: %s" % \ (self.distfile, "distfiles", e) self.scheduler.output(msg + '\n', background=True, log_path=self._log_path) logging.error(msg) else: size_ok = st.st_size == self.digests["size"] if not size_ok: if self.config.options.dry_run: if st is not None: logging.info(("dry-run: delete '%s' with " "wrong size from distfiles") % (self.distfile,)) else: # Do the unlink in order to ensure that the path is clear, # even if stat raised ENOENT, since a broken symlink can # trigger ENOENT. if self._unlink_file(distfile_path, "distfiles"): if st is not None: logging.debug(("delete '%s' with " "wrong size from distfiles") % (self.distfile,)) else: self.config.log_failure("%s\t%s\t%s" % (self.cpv, self.distfile, "unlink failed in distfiles")) self.returncode = os.EX_OK self._async_wait() return if size_ok: if self.config.options.verify_existing_digest: self._start_task( FileDigester(file_path=distfile_path, hash_names=(self._select_hash(),), background=self.background, logfile=self._log_path), self._distfiles_digester_exit) return self._success() self.returncode = os.EX_OK self._async_wait() return self._start_fetch() def _success(self): if not self._previously_added: size = self.digests["size"] self.config.added_byte_count += size self.config.added_file_count += 1 self.config.log_success("%s\t%s\tadded %i bytes" % (self.cpv, self.distfile, size)) if self._log_path is not None: if not self.config.options.dry_run: try: os.unlink(self._log_path) except OSError: pass if self.config.options.recycle_dir is not None: recycle_file = os.path.join( self.config.options.recycle_dir, self.distfile) if self.config.options.dry_run: if os.path.exists(recycle_file): logging.info("dry-run: delete '%s' from recycle" % (self.distfile,)) else: try: os.unlink(recycle_file) except OSError: pass else: logging.debug("delete '%s' from recycle" % (self.distfile,)) def _distfiles_digester_exit(self, digester): self._assert_current(digester) if self._was_cancelled(): self.wait() return if self._default_exit(digester) != os.EX_OK: msg = "%s distfiles digester failed unexpectedly" % \ (self.distfile,) self.scheduler.output(msg + '\n', background=True, log_path=self._log_path) logging.error(msg) self.wait() return wrong_digest = self._find_bad_digest(digester.digests) if wrong_digest is None: self._success() self.returncode = os.EX_OK self.wait() return self._start_fetch() _mirror_info = collections.namedtuple('_mirror_info', 'name location') def _start_fetch(self): self._previously_added = False self._fs_mirror_stack = [] if self.config.options.distfiles_local is not None: self._fs_mirror_stack.append(self._mirror_info( 'distfiles-local', self.config.options.distfiles_local)) if self.config.options.recycle_dir is not None: self._fs_mirror_stack.append(self._mirror_info( 'recycle', self.config.options.recycle_dir)) self._primaryuri_stack = [] self._mirror_stack = [] for uri in reversed(self.uri_tuple): if uri.startswith('mirror://'): self._mirror_stack.append( self._mirror_iterator(uri, self.config.mirrors)) else: self._primaryuri_stack.append(uri) self._tried_uris = set() self._try_next_mirror() @staticmethod def _mirror_iterator(uri, mirrors_dict): slash_index = uri.find("/", 9) if slash_index != -1: mirror_name = uri[9:slash_index].strip("/") for mirror in mirrors_dict.get(mirror_name, []): yield mirror.rstrip("/") + "/" + uri[slash_index+1:] def _try_next_mirror(self): if self._fs_mirror_stack: self._fetch_fs(self._fs_mirror_stack.pop()) return else: uri = self._next_uri() if uri is not None: self._tried_uris.add(uri) self._fetch_uri(uri) return if self._tried_uris: msg = "all uris failed" else: msg = "no fetchable uris" self.config.log_failure("%s\t%s\t%s" % (self.cpv, self.distfile, msg)) self.config.file_failures[self.distfile] = self.cpv self.returncode = os.EX_OK self.wait() def _next_uri(self): remaining_tries = self.config.options.tries - len(self._tried_uris) if remaining_tries > 0: if remaining_tries <= self.config.options.tries / 2: while self._primaryuri_stack: uri = self._primaryuri_stack.pop() if uri not in self._tried_uris: return uri while self._mirror_stack: uri = next(self._mirror_stack[-1], None) if uri is None: self._mirror_stack.pop() else: if uri not in self._tried_uris: return uri if self._primaryuri_stack: return self._primaryuri_stack.pop() return None def _fetch_fs(self, mirror_info): file_path = os.path.join(mirror_info.location, self.distfile) st = None size_ok = False try: st = os.stat(file_path) except OSError as e: if e.errno not in (errno.ENOENT, errno.ESTALE): msg = "%s stat failed in %s: %s" % \ (self.distfile, mirror_info.name, e) self.scheduler.output(msg + '\n', background=True, log_path=self._log_path) logging.error(msg) else: size_ok = st.st_size == self.digests["size"] self._current_stat = st if size_ok: self._current_mirror = mirror_info self._start_task( FileDigester(file_path=file_path, hash_names=(self._select_hash(),), background=self.background, logfile=self._log_path), self._fs_mirror_digester_exit) else: self._try_next_mirror() def _fs_mirror_digester_exit(self, digester): self._assert_current(digester) if self._was_cancelled(): self.wait() return current_mirror = self._current_mirror if digester.returncode != os.EX_OK: msg = "%s %s digester failed unexpectedly" % \ (self.distfile, current_mirror.name) self.scheduler.output(msg + '\n', background=True, log_path=self._log_path) logging.error(msg) else: bad_digest = self._find_bad_digest(digester.digests) if bad_digest is not None: msg = "%s %s has bad %s digest: expected %s, got %s" % \ (self.distfile, current_mirror.name, bad_digest, self.digests[bad_digest], digester.digests[bad_digest]) self.scheduler.output(msg + '\n', background=True, log_path=self._log_path) logging.error(msg) elif self.config.options.dry_run: # Report success without actually touching any files if self._same_device(current_mirror.location, self.config.options.distfiles): logging.info(("dry-run: hardlink '%s' from %s " "to distfiles") % (self.distfile, current_mirror.name)) else: logging.info("dry-run: copy '%s' from %s to distfiles" % (self.distfile, current_mirror.name)) self._success() self.returncode = os.EX_OK self.wait() return else: src = os.path.join(current_mirror.location, self.distfile) dest = os.path.join(self.config.options.distfiles, self.distfile) if self._hardlink_atomic(src, dest, "%s to %s" % (current_mirror.name, "distfiles")): logging.debug("hardlink '%s' from %s to distfiles" % (self.distfile, current_mirror.name)) self._success() self.returncode = os.EX_OK self.wait() return else: self._start_task( FileCopier(src_path=src, dest_path=dest, background=(self.background and self._log_path is not None), logfile=self._log_path), self._fs_mirror_copier_exit) return self._try_next_mirror() def _fs_mirror_copier_exit(self, copier): self._assert_current(copier) if self._was_cancelled(): self.wait() return current_mirror = self._current_mirror if copier.returncode != os.EX_OK: msg = "%s %s copy failed unexpectedly" % \ (self.distfile, current_mirror.name) self.scheduler.output(msg + '\n', background=True, log_path=self._log_path) logging.error(msg) else: logging.debug("copy '%s' from %s to distfiles" % (self.distfile, current_mirror.name)) try: portage.util.apply_stat_permissions( copier.dest_path, self._current_stat) except (OSError, PortageException) as e: msg = ("%s %s apply_stat_permissions " "failed unexpectedly: %s") % \ (self.distfile, current_mirror.name, e) self.scheduler.output(msg + '\n', background=True, log_path=self._log_path) logging.error(msg) try: if sys.hexversion >= 0x3030000: os.utime(copier.dest_path, ns=(self._current_stat.st_mtime_ns, self._current_stat.st_mtime_ns)) else: os.utime(copier.dest_path, (self._current_stat[stat.ST_MTIME], self._current_stat[stat.ST_MTIME])) except OSError as e: msg = "%s %s utime failed unexpectedly: %s" % \ (self.distfile, current_mirror.name, e) self.scheduler.output(msg + '\n', background=True, log_path=self._log_path) logging.error(msg) self._success() self.returncode = os.EX_OK self.wait() return self._try_next_mirror() def _fetch_uri(self, uri): if self.config.options.dry_run: # Simply report success. logging.info("dry-run: fetch '%s' from '%s'" % (self.distfile, uri)) self._success() self.returncode = os.EX_OK self.wait() return if self.config.options.temp_dir: self._fetch_tmp_dir_info = 'temp-dir' distdir = self.config.options.temp_dir else: self._fetch_tmp_dir_info = 'distfiles' distdir = self.config.options.distfiles tmp_basename = self.distfile + '._emirrordist_fetch_.%s' % os.getpid() variables = { "DISTDIR": distdir, "URI": uri, "FILE": tmp_basename } self._fetch_tmp_file = os.path.join(distdir, tmp_basename) try: os.unlink(self._fetch_tmp_file) except OSError: pass args = portage.util.shlex_split(default_fetchcommand) args = [portage.util.varexpand(x, mydict=variables) for x in args] if sys.hexversion < 0x3000000 or sys.hexversion >= 0x3020000: # Python 3.1 does not support bytes in Popen args. args = [_unicode_encode(x, encoding=_encodings['fs'], errors='strict') for x in args] null_fd = os.open(os.devnull, os.O_RDONLY) fetcher = PopenProcess(background=self.background, proc=subprocess.Popen(args, stdin=null_fd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT), scheduler=self.scheduler) os.close(null_fd) fetcher.pipe_reader = PipeLogger(background=self.background, input_fd=fetcher.proc.stdout, log_file_path=self._log_path, scheduler=self.scheduler) self._start_task(fetcher, self._fetcher_exit) def _fetcher_exit(self, fetcher): self._assert_current(fetcher) if self._was_cancelled(): self.wait() return if os.path.exists(self._fetch_tmp_file): self._start_task( FileDigester(file_path=self._fetch_tmp_file, hash_names=(self._select_hash(),), background=self.background, logfile=self._log_path), self._fetch_digester_exit) else: self._try_next_mirror() def _fetch_digester_exit(self, digester): self._assert_current(digester) if self._was_cancelled(): self.wait() return if digester.returncode != os.EX_OK: msg = "%s %s digester failed unexpectedly" % \ (self.distfile, self._fetch_tmp_dir_info) self.scheduler.output(msg + '\n', background=True, log_path=self._log_path) logging.error(msg) else: bad_digest = self._find_bad_digest(digester.digests) if bad_digest is not None: msg = "%s %s has bad %s digest: expected %s, got %s" % \ (self.distfile, self._current_mirror.name, bad_digest, self.digests[bad_digest], digester.digests[bad_digest]) self.scheduler.output(msg + '\n', background=True, log_path=self._log_path) try: os.unlink(self._fetch_tmp_file) except OSError: pass else: dest = os.path.join(self.config.options.distfiles, self.distfile) try: os.rename(self._fetch_tmp_file, dest) except OSError: self._start_task( FileCopier(src_path=self._fetch_tmp_file, dest_path=dest, background=(self.background and self._log_path is not None), logfile=self._log_path), self._fetch_copier_exit) return else: self._success() self.returncode = os.EX_OK self.wait() return self._try_next_mirror() def _fetch_copier_exit(self, copier): self._assert_current(copier) try: os.unlink(self._fetch_tmp_file) except OSError: pass if self._was_cancelled(): self.wait() return if copier.returncode == os.EX_OK: self._success() self.returncode = os.EX_OK self.wait() else: # out of space? msg = "%s %s copy failed unexpectedly" % \ (self.distfile, self._fetch_tmp_dir_info) self.scheduler.output(msg + '\n', background=True, log_path=self._log_path) logging.error(msg) self.config.log_failure("%s\t%s\t%s" % (self.cpv, self.distfile, msg)) self.config.file_failures[self.distfile] = self.cpv self.returncode = 1 self.wait() def _unlink_file(self, file_path, dir_info): try: os.unlink(file_path) except OSError as e: if e.errno not in (errno.ENOENT, errno.ESTALE): msg = "unlink '%s' failed in %s: %s" % \ (self.distfile, dir_info, e) self.scheduler.output(msg + '\n', background=True, log_path=self._log_path) logging.error(msg) return False return True def _have_needed_digests(self): return "size" in self.digests and \ self._select_hash() is not None def _select_hash(self): if default_hash_name in self.digests: return default_hash_name else: for hash_name in self.digests: if hash_name != "size" and \ hash_name in portage.checksum.hashfunc_map: return hash_name return None def _find_bad_digest(self, digests): for hash_name, hash_value in digests.items(): if self.digests[hash_name] != hash_value: return hash_name return None @staticmethod def _same_device(path1, path2): try: st1 = os.stat(path1) st2 = os.stat(path2) except OSError: return False else: return st1.st_dev == st2.st_dev def _hardlink_atomic(self, src, dest, dir_info): head, tail = os.path.split(dest) hardlink_tmp = os.path.join(head, ".%s._mirrordist_hardlink_.%s" % \ (tail, os.getpid())) try: try: os.link(src, hardlink_tmp) except OSError as e: if e.errno != errno.EXDEV: msg = "hardlink %s from %s failed: %s" % \ (self.distfile, dir_info, e) self.scheduler.output(msg + '\n', background=True, log_path=self._log_path) logging.error(msg) return False try: os.rename(hardlink_tmp, dest) except OSError as e: msg = "hardlink rename '%s' from %s failed: %s" % \ (self.distfile, dir_info, e) self.scheduler.output(msg + '\n', background=True, log_path=self._log_path) logging.error(msg) return False finally: try: os.unlink(hardlink_tmp) except OSError: pass return True