From 12c795d81251b03f3d33d02696dc8db1299dee78 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Wed, 9 Jan 2013 06:38:16 -0800 Subject: Add emirrordist, a tool for mirroring distfiles. Special thanks to Brian Harring, author of the mirror-dist program from which emirrordist is derived. --- bin/emirrordist | 13 + man/emirrordist.1 | 143 +++++++ pym/portage/_emirrordist/Config.py | 132 ++++++ pym/portage/_emirrordist/DeletionIterator.py | 83 ++++ pym/portage/_emirrordist/DeletionTask.py | 129 ++++++ pym/portage/_emirrordist/FetchIterator.py | 132 ++++++ pym/portage/_emirrordist/FetchTask.py | 615 +++++++++++++++++++++++++++ pym/portage/_emirrordist/MirrorDistTask.py | 218 ++++++++++ pym/portage/_emirrordist/__init__.py | 2 + pym/portage/_emirrordist/main.py | 438 +++++++++++++++++++ pym/portage/util/_ShelveUnicodeWrapper.py | 45 ++ pym/portage/util/_async/FileCopier.py | 17 + 12 files changed, 1967 insertions(+) create mode 100755 bin/emirrordist create mode 100644 man/emirrordist.1 create mode 100644 pym/portage/_emirrordist/Config.py create mode 100644 pym/portage/_emirrordist/DeletionIterator.py create mode 100644 pym/portage/_emirrordist/DeletionTask.py create mode 100644 pym/portage/_emirrordist/FetchIterator.py create mode 100644 pym/portage/_emirrordist/FetchTask.py create mode 100644 pym/portage/_emirrordist/MirrorDistTask.py create mode 100644 pym/portage/_emirrordist/__init__.py create mode 100644 pym/portage/_emirrordist/main.py create mode 100644 pym/portage/util/_ShelveUnicodeWrapper.py create mode 100644 pym/portage/util/_async/FileCopier.py diff --git a/bin/emirrordist b/bin/emirrordist new file mode 100755 index 000000000..8d93de9c0 --- /dev/null +++ b/bin/emirrordist @@ -0,0 +1,13 @@ +#!/usr/bin/python +# Copyright 2013 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import sys + +import portage +portage._internal_caller = True +portage._disable_legacy_globals() +from portage._emirrordist.main import emirrordist_main + +if __name__ == "__main__": + sys.exit(emirrordist_main(sys.argv[1:])) diff --git a/man/emirrordist.1 b/man/emirrordist.1 new file mode 100644 index 000000000..6d020770e --- /dev/null +++ b/man/emirrordist.1 @@ -0,0 +1,143 @@ +.TH "EMIRRORDIST" "1" "Jan 2013" "Portage VERSION" "Portage" +.SH "NAME" +emirrordist \- a fetch tool for mirroring of package distfiles +.SH SYNOPSIS +.B emirrordist +[\fIoptions\fR] \fI\fR +.SH ACTIONS +.TP +\fB\-h\fR, \fB\-\-help\fR +Show a help message and exit. +.TP +\fB\-\-version\fR +Display portage version and exit. +.TP +\fB\-\-mirror\fR +Mirror distfiles for the selected repository. +.SH OPTIONS +.TP +\fB\-\-dry\-run\fR +Perform a trial run with no changes made (typically combined +with \fI\-v\fR or \fI\-vv\fR). +.TP +\fB\-v\fR, \fB\-\-verbose\fR +Display extra information on stderr (multiple occurences +increase verbosity). +.TP +\fB\-\-ignore\-default\-opts\fR +Do not use the \fIEMIRRORDIST_DEFAULT_OPTS\fR environment +variable. +.TP +\fB\-\-distfiles\fR=\fIDIR\fR +Distfiles directory to use (required). +.TP +\fB\-j\fR JOBS, \fB\-\-jobs\fR=\fIJOBS\fR +Number of concurrent jobs to run. +.TP +\fB\-l\fR LOAD, \fB\-\-load\-average\fR=\fILOAD\fR +Load average limit for spawning of new concurrent jobs. +.TP +\fB\-\-tries\fR=\fITRIES\fR +Maximum number of tries per file, 0 means unlimited +(default is 10). +.TP +\fB\-\-repo\fR=\fIREPO\fR +Name of repo to operate on (default repo is located at +$PORTDIR). +.TP +\fB\-\-config\-root\fR=\fIDIR\fR +Location of portage config files. +.TP +\fB\-\-portdir\fR=\fIDIR\fR +Override the portage tree location. +.TP +\fB\-\-portdir\-overlay\fR=\fIPORTDIR_OVERLAY\fR +Override the PORTDIR_OVERLAY variable (requires that +\fI\-\-repo\fR is also specified). +.TP +\fB\-\-strict\-manifests=\fR +Manually override "strict" FEATURES setting. +.TP +\fB\-\-failure\-log\fR=\fIFILE\fR +Log file for fetch failures, with tab\-delimited output, for +reporting purposes. Opened in append mode. +.TP +\fB\-\-success\-log\fR=\fIFILE\fR +Log file for fetch successes, with tab\-delimited output, for +reporting purposes. Opened in append mode. +.TP +\fB\-\-scheduled\-deletion\-log\fR=\fIFILE\fR +Log file for scheduled deletions, with tab\-delimited output, for +reporting purposes. Overwritten with each run. +.TP +\fB\-\-delete\fR +Enable deletion of unused distfiles. +.TP +\fB\-\-deletion\-db\fR=\fIFILE\fR +Database file used to track lifetime of files scheduled for +delayed deletion. +.TP +\fB\-\-deletion\-delay\fR=\fISECONDS\fR +Delay time for deletion of unused distfiles, measured in seconds. +.TP +\fB\-\-temp\-dir\fR=\fIDIR\fR +Temporary directory for downloads. +.TP +\fB\-\-mirror\-overrides\fR=\fIFILE\fR +File holding a list of mirror overrides. +.TP +\fB\-\-mirror\-skip\fR=\fIMIRROR_SKIP\fR +Comma delimited list of mirror targets to skip when +fetching. +.TP +\fB\-\-restrict\-mirror\-exemptions\fR=\fIRESTRICT_MIRROR_EXEMPTIONS\fR +Comma delimited list of mirror targets for which to ignore +RESTRICT="mirror" (see \fBebuild\fR(5)). +.TP +\fB\-\-verify\-existing\-digest\fR +Use digest as a verification of whether existing +distfiles are valid. +.TP +\fB\-\-distfiles\-local\fR=\fIDIR\fR +The distfiles\-local directory to use. +.TP +\fB\-\-distfiles\-db\fR=\fIFILE\fR +Database file used to track which ebuilds a distfile belongs to. +.TP +\fB\-\-recycle\-dir\fR=\fIDIR\fR +Directory for extended retention of files that are removed from +distdir with the \-\-delete option. These files may be be recycled if +they are needed again, instead of downloading them again. +.TP +\fB\-\-recycle\-db\fR=\fIFILE\fR +Database file used to track lifetime of files in recycle dir. +.TP +\fB\-\-recycle\-deletion\-delay\fR=\fISECONDS\fR +Delay time for deletion of unused files from recycle dir, +measured in seconds (defaults to the equivalent of 60 days). +.TP +\fB\-\-fetch\-log\-dir\fR=\fIDIR\fR +Directory for individual fetch logs. +.TP +\fB\-\-whitelist\-from\fR=\fIFILE\fR +Specifies a file containing a list of files to whitelist, one per line, +# prefixed lines ignored. Use this option multiple times in order to +specify multiple whitelists. +.SH "REPORTING BUGS" +Please report bugs via http://bugs.gentoo.org/ +.SH "THANKS" +Special thanks to Brian Harring, author of the mirror\-dist program from +which emirrordist is derived. +.SH "AUTHORS" +.nf +Zac Medico +.fi +.SH "FILES" +.TP +.B /etc/portage/make.conf +Contains variables. +.SH "SEE ALSO" +.BR ebuild (5), +.BR egencache (1), +.BR make.conf (5), +.BR portage (5) diff --git a/pym/portage/_emirrordist/Config.py b/pym/portage/_emirrordist/Config.py new file mode 100644 index 000000000..db4bfebd4 --- /dev/null +++ b/pym/portage/_emirrordist/Config.py @@ -0,0 +1,132 @@ +# Copyright 2013 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import copy +import io +import logging +import shelve +import sys +import time + +import portage +from portage import os +from portage.util import grabdict, grablines +from portage.util._ShelveUnicodeWrapper import ShelveUnicodeWrapper + +class Config(object): + def __init__(self, options, portdb, event_loop): + self.options = options + self.portdb = portdb + self.event_loop = event_loop + self.added_byte_count = 0 + self.added_file_count = 0 + self.scheduled_deletion_count = 0 + self.delete_count = 0 + self.file_owners = {} + self.file_failures = {} + self.start_time = time.time() + self._open_files = [] + + self.log_success = self._open_log('success', options.success_log, 'a') + self.log_failure = self._open_log('failure', options.failure_log, 'a') + + self.distfiles = None + if options.distfiles is not None: + self.distfiles = options.distfiles + + self.mirrors = copy.copy(portdb.settings.thirdpartymirrors()) + + if options.mirror_overrides is not None: + self.mirrors.update(grabdict(options.mirror_overrides)) + + if options.mirror_skip is not None: + for x in options.mirror_skip.split(","): + self.mirrors[x] = [] + + self.whitelist = None + if options.whitelist_from is not None: + self.whitelist = set() + for filename in options.whitelist_from: + for line in grablines(filename): + line = line.strip() + if line and not line.startswith("#"): + self.whitelist.add(line) + + self.restrict_mirror_exemptions = None + if options.restrict_mirror_exemptions is not None: + self.restrict_mirror_exemptions = frozenset( + options.restrict_mirror_exemptions.split(",")) + + self.recycle_db = None + if options.recycle_db is not None: + self.recycle_db = self._open_shelve( + options.recycle_db, 'recycle') + + self.distfiles_db = None + if options.distfiles_db is not None: + self.distfiles_db = self._open_shelve( + options.distfiles_db, 'distfiles') + + self.deletion_db = None + if options.deletion_db is not None: + self.deletion_db = self._open_shelve( + options.deletion_db, 'deletion') + + def _open_log(self, log_desc, log_path, mode): + + if log_path is None or self.options.dry_run: + log_func = logging.info + line_format = "%s: %%s" % log_desc + add_newline = False + if log_path is not None: + logging.warn(("dry-run: %s log " + "redirected to logging.info") % log_desc) + else: + self._open_files.append(io.open(log_path, mode=mode, + encoding='utf_8')) + line_format = "%s\n" + log_func = self._open_files[-1].write + + return self._LogFormatter(line_format, log_func) + + class _LogFormatter(object): + + __slots__ = ('_line_format', '_log_func') + + def __init__(self, line_format, log_func): + self._line_format = line_format + self._log_func = log_func + + def __call__(self, msg): + self._log_func(self._line_format % (msg,)) + + def _open_shelve(self, db_file, db_desc): + if self.options.dry_run: + open_flag = "r" + else: + open_flag = "c" + + if self.options.dry_run and not os.path.exists(db_file): + db = {} + else: + db = shelve.open(db_file, flag=open_flag) + if sys.hexversion < 0x3000000: + db = ShelveUnicodeWrapper(db) + + if self.options.dry_run: + logging.warn("dry-run: %s db opened in readonly mode" % db_desc) + if not isinstance(db, dict): + volatile_db = dict((k, db[k]) for k in db) + db.close() + db = volatile_db + else: + self._open_files.append(db) + + return db + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + while self._open_files: + self._open_files.pop().close() diff --git a/pym/portage/_emirrordist/DeletionIterator.py b/pym/portage/_emirrordist/DeletionIterator.py new file mode 100644 index 000000000..dff52c042 --- /dev/null +++ b/pym/portage/_emirrordist/DeletionIterator.py @@ -0,0 +1,83 @@ +# Copyright 2013 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import logging +import stat + +from portage import os +from .DeletionTask import DeletionTask + +class DeletionIterator(object): + + def __init__(self, config): + self._config = config + + def __iter__(self): + distdir = self._config.options.distfiles + file_owners = self._config.file_owners + whitelist = self._config.whitelist + distfiles_local = self._config.options.distfiles_local + deletion_db = self._config.deletion_db + deletion_delay = self._config.options.deletion_delay + start_time = self._config.start_time + distfiles_set = set(os.listdir(self._config.options.distfiles)) + for filename in distfiles_set: + try: + st = os.stat(os.path.join(distdir, filename)) + except OSError as e: + logging.error("stat failed on '%s' in distfiles: %s\n" % + (filename, e)) + continue + if not stat.S_ISREG(st.st_mode): + continue + elif filename in file_owners: + if deletion_db is not None: + try: + del deletion_db[filename] + except KeyError: + pass + elif whitelist is not None and filename in whitelist: + if deletion_db is not None: + try: + del deletion_db[filename] + except KeyError: + pass + elif distfiles_local is not None and \ + os.path.exists(os.path.join(distfiles_local, filename)): + if deletion_db is not None: + try: + del deletion_db[filename] + except KeyError: + pass + else: + self._config.scheduled_deletion_count += 1 + + if deletion_db is None or deletion_delay is None: + + yield DeletionTask(background=True, + distfile=filename, + config=self._config) + + else: + deletion_entry = deletion_db.get(filename) + + if deletion_entry is None: + logging.debug("add '%s' to deletion db" % filename) + deletion_db[filename] = start_time + + elif deletion_entry + deletion_delay <= start_time: + + yield DeletionTask(background=True, + distfile=filename, + config=self._config) + + if deletion_db is not None: + for filename in list(deletion_db): + if filename not in distfiles_set: + try: + del deletion_db[filename] + except KeyError: + pass + else: + logging.debug("drop '%s' from deletion db" % + filename) diff --git a/pym/portage/_emirrordist/DeletionTask.py b/pym/portage/_emirrordist/DeletionTask.py new file mode 100644 index 000000000..7d10957fa --- /dev/null +++ b/pym/portage/_emirrordist/DeletionTask.py @@ -0,0 +1,129 @@ +# Copyright 2013 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import errno +import logging + +from portage import os +from portage.util._async.FileCopier import FileCopier +from _emerge.CompositeTask import CompositeTask + +class DeletionTask(CompositeTask): + + __slots__ = ('distfile', 'config') + + def _start(self): + + distfile_path = os.path.join( + self.config.options.distfiles, self.distfile) + + if self.config.options.recycle_dir is not None: + distfile_path = os.path.join(self.config.options.distfiles, self.distfile) + recycle_path = os.path.join( + self.config.options.recycle_dir, self.distfile) + if self.config.options.dry_run: + logging.info(("dry-run: move '%s' from " + "distfiles to recycle") % self.distfile) + else: + logging.debug(("move '%s' from " + "distfiles to recycle") % self.distfile) + try: + os.rename(distfile_path, recycle_path) + except OSError as e: + if e.errno != errno.EXDEV: + logging.error(("rename %s from distfiles to " + "recycle failed: %s") % (self.distfile, e)) + else: + self.returncode = os.EX_OK + self._async_wait() + return + + self._start_task( + FileCopier(src_path=distfile_path, + dest_path=recycle_path, + background=False), + self._recycle_copier_exit) + return + + success = True + + if self.config.options.dry_run: + logging.info(("dry-run: delete '%s' from " + "distfiles") % self.distfile) + else: + logging.debug(("delete '%s' from " + "distfiles") % self.distfile) + try: + os.unlink(distfile_path) + except OSError as e: + if e.errno not in (errno.ENOENT, errno.ESTALE): + logging.error("%s unlink failed in distfiles: %s" % + (self.distfile, e)) + success = False + + if success: + self._success() + self.returncode = os.EX_OK + else: + self.returncode = 1 + + self._async_wait() + + def _recycle_copier_exit(self, copier): + + self._assert_current(copier) + if self._was_cancelled(): + self.wait() + return + + success = True + if copier.returncode == os.EX_OK: + + try: + os.unlink(copier.src_path) + except OSError as e: + if e.errno not in (errno.ENOENT, errno.ESTALE): + logging.error("%s unlink failed in distfiles: %s" % + (self.distfile, e)) + success = False + + else: + logging.error(("%s copy from distfiles " + "to recycle failed: %s") % (self.distfile, e)) + success = False + + if success: + self._success() + self.returncode = os.EX_OK + else: + self.returncode = 1 + + self._current_task = None + self.wait() + + def _success(self): + + cpv = "unknown" + if self.config.distfiles_db is not None: + cpv = self.config.distfiles_db.get(self.distfile, cpv) + + self.config.delete_count += 1 + self.config.log_success("%s\t%s\tremoved" % (cpv, self.distfile)) + + if self.config.distfiles_db is not None: + try: + del self.config.distfiles_db[self.distfile] + except KeyError: + pass + else: + logging.debug(("drop '%s' from " + "distfiles db") % self.distfile) + + if self.config.deletion_db is not None: + try: + del self.config.deletion_db[self.distfile] + except KeyError: + pass + else: + logging.debug(("drop '%s' from " + "deletion db") % self.distfile) diff --git a/pym/portage/_emirrordist/FetchIterator.py b/pym/portage/_emirrordist/FetchIterator.py new file mode 100644 index 000000000..cc5d4be00 --- /dev/null +++ b/pym/portage/_emirrordist/FetchIterator.py @@ -0,0 +1,132 @@ +# Copyright 2013 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +from portage import os +from portage.dep import use_reduce +from portage.exception import PortageException +from portage.manifest import Manifest +from .FetchTask import FetchTask + +class FetchIterator(object): + + def __init__(self, config): + self._config = config + self._log_failure = config.log_failure + + def _iter_every_cp(self): + # List categories individually, in order to start yielding quicker, + # and in order to reduce latency in case of a signal interrupt. + cp_all = self._config.portdb.cp_all + for category in sorted(self._config.portdb.categories): + for cp in cp_all(categories=(category,)): + yield cp + + def __iter__(self): + + portdb = self._config.portdb + file_owners = self._config.file_owners + file_failures = self._config.file_failures + restrict_mirror_exemptions = self._config.restrict_mirror_exemptions + + for cp in self._iter_every_cp(): + + for tree in portdb.porttrees: + + # Reset state so the Manifest is pulled once + # for this cp / tree combination. + digests = None + + for cpv in portdb.cp_list(cp, mytree=tree): + + try: + restrict, = portdb.aux_get(cpv, ("RESTRICT",), + mytree=tree) + except (KeyError, PortageException) as e: + self._log_failure("%s\t\taux_get exception %s" % + (cpv, e)) + continue + + # Here we use matchnone=True to ignore conditional parts + # of RESTRICT since they don't apply unconditionally. + # Assume such conditionals only apply on the client side. + try: + restrict = frozenset(use_reduce(restrict, + flat=True, matchnone=True)) + except PortageException as e: + self._log_failure("%s\t\tuse_reduce exception %s" % + (cpv, e)) + continue + + if "fetch" in restrict: + continue + + try: + uri_map = portdb.getFetchMap(cpv) + except PortageException as e: + self._log_failure("%s\t\tgetFetchMap exception %s" % + (cpv, e)) + continue + + if not uri_map: + continue + + if "mirror" in restrict: + skip = False + if restrict_mirror_exemptions is not None: + new_uri_map = {} + for filename, uri_tuple in uri_map.items(): + for uri in uri_tuple: + if uri[:9] == "mirror://": + i = uri.find("/", 9) + if i != -1 and uri[9:i].strip("/") in \ + restrict_mirror_exemptions: + new_uri_map[filename] = uri_tuple + break + if new_uri_map: + uri_map = new_uri_map + else: + skip = True + else: + skip = True + + if skip: + continue + + # Parse Manifest for this cp if we haven't yet. + if digests is None: + try: + digests = Manifest(os.path.join( + tree, cp)).getTypeDigests("DIST") + except (EnvironmentError, PortageException) as e: + for filename in uri_map: + self._log_failure( + "%s\t%s\tManifest exception %s" % + (cpv, filename, e)) + file_failures[filename] = cpv + continue + + if not digests: + for filename in uri_map: + self._log_failure("%s\t%s\tdigest entry missing" % + (cpv, filename)) + file_failures[filename] = cpv + continue + + for filename, uri_tuple in uri_map.items(): + file_digests = digests.get(filename) + if file_digests is None: + self._log_failure("%s\t%s\tdigest entry missing" % + (cpv, filename)) + file_failures[filename] = cpv + continue + if filename in file_owners: + continue + file_owners[filename] = cpv + + yield FetchTask(cpv=cpv, + background=True, + digests=file_digests, + distfile=filename, + restrict=restrict, + uri_tuple=uri_tuple, + config=self._config) diff --git a/pym/portage/_emirrordist/FetchTask.py b/pym/portage/_emirrordist/FetchTask.py new file mode 100644 index 000000000..65fd672b9 --- /dev/null +++ b/pym/portage/_emirrordist/FetchTask.py @@ -0,0 +1,615 @@ +# Copyright 2013 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import collections +import errno +import logging +import stat +import subprocess +import sys + +import portage +from portage import _encodings, _unicode_encode +from portage import os +from portage.exception import PortageException +from portage.util._async.FileCopier import FileCopier +from portage.util._async.FileDigester import FileDigester +from portage.util._async.PipeLogger import PipeLogger +from portage.util._async.PopenProcess import PopenProcess +from _emerge.CompositeTask import CompositeTask + +default_hash_name = portage.const.MANIFEST2_REQUIRED_HASH + +default_fetchcommand = "wget -c -v -t 1 --passive-ftp --timeout=60 -O \"${DISTDIR}/${FILE}\" \"${URI}\"" + +class FetchTask(CompositeTask): + + __slots__ = ('distfile', 'digests', 'config', 'cpv', + 'restrict', 'uri_tuple', '_current_mirror', + '_current_stat', '_fetch_tmp_dir_info', '_fetch_tmp_file', + '_fs_mirror_stack', '_mirror_stack', + '_previously_added', + '_primaryuri_stack', '_log_path', '_tried_uris') + + def _start(self): + + if self.config.options.fetch_log_dir is not None and \ + not self.config.options.dry_run: + self._log_path = os.path.join( + self.config.options.fetch_log_dir, + self.distfile + '.log') + + self._previously_added = True + if self.config.distfiles_db is not None and \ + self.distfile not in self.config.distfiles_db: + self._previously_added = False + self.config.distfiles_db[self.distfile] = self.cpv + + if not self._have_needed_digests(): + msg = "incomplete digests: %s" % " ".join(self.digests) + self.scheduler.output(msg, background=self.background, + log_path=self._log_path) + self.config.log_failure("%s\t%s\t%s" % + (self.cpv, self.distfile, msg)) + self.config.file_failures[self.distfile] = self.cpv + self.returncode = os.EX_OK + self._async_wait() + return + + distfile_path = os.path.join( + self.config.options.distfiles, self.distfile) + + st = None + size_ok = False + try: + st = os.stat(distfile_path) + except OSError as e: + if e.errno not in (errno.ENOENT, errno.ESTALE): + msg = "%s stat failed in %s: %s" % \ + (self.distfile, "distfiles", e) + self.scheduler.output(msg + '\n', background=True, + log_path=self._log_path) + logging.error(msg) + else: + size_ok = st.st_size == self.digests["size"] + + if not size_ok: + if self.config.options.dry_run: + if st is not None: + logging.info(("dry-run: delete '%s' with " + "wrong size from distfiles") % (self.distfile,)) + else: + # Do the unlink in order to ensure that the path is clear, + # even if stat raised ENOENT, since a broken symlink can + # trigger ENOENT. + if self._unlink_file(distfile_path, "distfiles"): + if st is not None: + logging.debug(("delete '%s' with " + "wrong size from distfiles") % (self.distfile,)) + else: + self.config.log_failure("%s\t%s\t%s" % + (self.cpv, self.distfile, "unlink failed in distfiles")) + self.returncode = os.EX_OK + self._async_wait() + return + + if size_ok: + if self.config.options.verify_existing_digest: + self._start_task( + FileDigester(file_path=distfile_path, + hash_names=(self._select_hash(),), + background=self.background, + logfile=self._log_path), self._distfiles_digester_exit) + return + + self._success() + self.returncode = os.EX_OK + self._async_wait() + return + + self._start_fetch() + + def _success(self): + if not self._previously_added: + size = self.digests["size"] + self.config.added_byte_count += size + self.config.added_file_count += 1 + self.config.log_success("%s\t%s\tadded %i bytes" % + (self.cpv, self.distfile, size)) + + if self._log_path is not None: + if not self.config.options.dry_run: + try: + os.unlink(self._log_path) + except OSError: + pass + + if self.config.options.recycle_dir is not None: + + recycle_file = os.path.join( + self.config.options.recycle_dir, self.distfile) + + if self.config.options.dry_run: + if os.path.exists(recycle_file): + logging.info("dry-run: delete '%s' from recycle" % + (self.distfile,)) + else: + try: + os.unlink(recycle_file) + except OSError: + pass + else: + logging.debug("delete '%s' from recycle" % + (self.distfile,)) + + def _distfiles_digester_exit(self, digester): + + self._assert_current(digester) + if self._was_cancelled(): + self.wait() + return + + if self._default_exit(digester) != os.EX_OK: + msg = "%s distfiles digester failed unexpectedly" % \ + (self.distfile,) + self.scheduler.output(msg + '\n', background=True, + log_path=self._log_path) + logging.error(msg) + self.wait() + return + + wrong_digest = self._find_bad_digest(digester.digests) + if wrong_digest is None: + self._success() + self.returncode = os.EX_OK + self.wait() + return + + self._start_fetch() + + _mirror_info = collections.namedtuple('_mirror_info', + 'name location') + + def _start_fetch(self): + + self._previously_added = False + self._fs_mirror_stack = [] + if self.config.options.distfiles_local is not None: + self._fs_mirror_stack.append(self._mirror_info( + 'distfiles-local', self.config.options.distfiles_local)) + if self.config.options.recycle_dir is not None: + self._fs_mirror_stack.append(self._mirror_info( + 'recycle', self.config.options.recycle_dir)) + + self._primaryuri_stack = [] + self._mirror_stack = [] + for uri in reversed(self.uri_tuple): + if uri.startswith('mirror://'): + self._mirror_stack.append( + self._mirror_iterator(uri, self.config.mirrors)) + else: + self._primaryuri_stack.append(uri) + + self._tried_uris = set() + self._try_next_mirror() + + @staticmethod + def _mirror_iterator(uri, mirrors_dict): + + slash_index = uri.find("/", 9) + if slash_index != -1: + mirror_name = uri[9:slash_index].strip("/") + for mirror in mirrors_dict.get(mirror_name, []): + yield mirror.rstrip("/") + "/" + uri[slash_index+1:] + + def _try_next_mirror(self): + if self._fs_mirror_stack: + self._fetch_fs(self._fs_mirror_stack.pop()) + return + else: + uri = self._next_uri() + if uri is not None: + self._tried_uris.add(uri) + self._fetch_uri(uri) + return + + if self._tried_uris: + msg = "all uris failed" + else: + msg = "no fetchable uris" + + self.config.log_failure("%s\t%s\t%s" % + (self.cpv, self.distfile, msg)) + self.config.file_failures[self.distfile] = self.cpv + self.returncode = os.EX_OK + self.wait() + + def _next_uri(self): + remaining_tries = self.config.options.tries - len(self._tried_uris) + if remaining_tries > 0: + + if remaining_tries <= self.config.options.tries / 2: + while self._primaryuri_stack: + uri = self._primaryuri_stack.pop() + if uri not in self._tried_uris: + return uri + + while self._mirror_stack: + uri = next(self._mirror_stack[-1], None) + if uri is None: + self._mirror_stack.pop() + else: + if uri not in self._tried_uris: + return uri + + if self._primaryuri_stack: + return self._primaryuri_stack.pop() + + return None + + def _fetch_fs(self, mirror_info): + file_path = os.path.join(mirror_info.location, self.distfile) + + st = None + size_ok = False + try: + st = os.stat(file_path) + except OSError as e: + if e.errno not in (errno.ENOENT, errno.ESTALE): + msg = "%s stat failed in %s: %s" % \ + (self.distfile, mirror_info.name, e) + self.scheduler.output(msg + '\n', background=True, + log_path=self._log_path) + logging.error(msg) + else: + size_ok = st.st_size == self.digests["size"] + self._current_stat = st + + if size_ok: + self._current_mirror = mirror_info + self._start_task( + FileDigester(file_path=file_path, + hash_names=(self._select_hash(),), + background=self.background, + logfile=self._log_path), + self._fs_mirror_digester_exit) + else: + self._try_next_mirror() + + def _fs_mirror_digester_exit(self, digester): + + self._assert_current(digester) + if self._was_cancelled(): + self.wait() + return + + current_mirror = self._current_mirror + if digester.returncode != os.EX_OK: + msg = "%s %s digester failed unexpectedly" % \ + (self.distfile, current_mirror.name) + self.scheduler.output(msg + '\n', background=True, + log_path=self._log_path) + logging.error(msg) + else: + bad_digest = self._find_bad_digest(digester.digests) + if bad_digest is not None: + msg = "%s %s has bad %s digest: expected %s, got %s" % \ + (self.distfile, current_mirror.name, bad_digest, + self.digests[bad_digest], digester.digests[bad_digest]) + self.scheduler.output(msg + '\n', background=True, + log_path=self._log_path) + logging.error(msg) + elif self.config.options.dry_run: + # Report success without actually touching any files + if self._same_device(current_mirror.location, + self.config.options.distfiles): + logging.info(("dry-run: hardlink '%s' from %s " + "to distfiles") % (self.distfile, current_mirror.name)) + else: + logging.info("dry-run: copy '%s' from %s to distfiles" % + (self.distfile, current_mirror.name)) + self._success() + self.returncode = os.EX_OK + self.wait() + return + else: + src = os.path.join(current_mirror.location, self.distfile) + dest = os.path.join(self.config.options.distfiles, self.distfile) + if self._hardlink_atomic(src, dest, + "%s to %s" % (current_mirror.name, "distfiles")): + logging.debug("hardlink '%s' from %s to distfiles" % + (self.distfile, current_mirror.name)) + self._success() + self.returncode = os.EX_OK + self.wait() + return + else: + self._start_task( + FileCopier(src_path=src, dest_path=dest, + background=(self.background and + self._log_path is not None), + logfile=self._log_path), + self._fs_mirror_copier_exit) + return + + self._try_next_mirror() + + def _fs_mirror_copier_exit(self, copier): + + self._assert_current(copier) + if self._was_cancelled(): + self.wait() + return + + current_mirror = self._current_mirror + if copier.returncode != os.EX_OK: + msg = "%s %s copy failed unexpectedly" % \ + (self.distfile, current_mirror.name) + self.scheduler.output(msg + '\n', background=True, + log_path=self._log_path) + logging.error(msg) + else: + + logging.debug("copy '%s' from %s to distfiles" % + (self.distfile, current_mirror.name)) + + try: + portage.util.apply_stat_permissions( + copier.dest_path, self._current_stat) + except (OSError, PortageException) as e: + msg = ("%s %s apply_stat_permissions " + "failed unexpectedly: %s") % \ + (self.distfile, current_mirror.name, e) + self.scheduler.output(msg + '\n', background=True, + log_path=self._log_path) + logging.error(msg) + + try: + if sys.hexversion >= 0x3030000: + os.utime(copier.dest_path, + ns=(self._current_stat.st_mtime_ns, + self._current_stat.st_mtime_ns)) + else: + os.utime(copier.dest_path, + (self._current_stat[stat.ST_MTIME], + self._current_stat[stat.ST_MTIME])) + except OSError as e: + msg = "%s %s utime failed unexpectedly: %s" % \ + (self.distfile, current_mirror.name, e) + self.scheduler.output(msg + '\n', background=True, + log_path=self._log_path) + logging.error(msg) + + self._success() + self.returncode = os.EX_OK + self.wait() + return + + self._try_next_mirror() + + def _fetch_uri(self, uri): + + if self.config.options.dry_run: + # Simply report success. + logging.info("dry-run: fetch '%s' from '%s'" % + (self.distfile, uri)) + self._success() + self.returncode = os.EX_OK + self.wait() + return + + if self.config.options.temp_dir: + self._fetch_tmp_dir_info = 'temp-dir' + distdir = self.config.options.temp_dir + else: + self._fetch_tmp_dir_info = 'distfiles' + distdir = self.config.options.distfiles + + tmp_basename = self.distfile + '._emirrordist_fetch_.%s' % os.getpid() + + variables = { + "DISTDIR": distdir, + "URI": uri, + "FILE": tmp_basename + } + + self._fetch_tmp_file = os.path.join(distdir, tmp_basename) + + try: + os.unlink(self._fetch_tmp_file) + except OSError: + pass + + args = portage.util.shlex_split(default_fetchcommand) + args = [portage.util.varexpand(x, mydict=variables) + for x in args] + + if sys.hexversion < 0x3000000 or sys.hexversion >= 0x3020000: + # Python 3.1 does not support bytes in Popen args. + args = [_unicode_encode(x, + encoding=_encodings['fs'], errors='strict') for x in args] + + null_fd = os.open(os.devnull, os.O_RDONLY) + fetcher = PopenProcess(background=self.background, + proc=subprocess.Popen(args, stdin=null_fd, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT), + scheduler=self.scheduler) + os.close(null_fd) + + fetcher.pipe_reader = PipeLogger(background=self.background, + input_fd=fetcher.proc.stdout, log_file_path=self._log_path, + scheduler=self.scheduler) + + self._start_task(fetcher, self._fetcher_exit) + + def _fetcher_exit(self, fetcher): + + self._assert_current(fetcher) + if self._was_cancelled(): + self.wait() + return + + if os.path.exists(self._fetch_tmp_file): + self._start_task( + FileDigester(file_path=self._fetch_tmp_file, + hash_names=(self._select_hash(),), + background=self.background, + logfile=self._log_path), + self._fetch_digester_exit) + else: + self._try_next_mirror() + + def _fetch_digester_exit(self, digester): + + self._assert_current(digester) + if self._was_cancelled(): + self.wait() + return + + if digester.returncode != os.EX_OK: + msg = "%s %s digester failed unexpectedly" % \ + (self.distfile, self._fetch_tmp_dir_info) + self.scheduler.output(msg + '\n', background=True, + log_path=self._log_path) + logging.error(msg) + else: + bad_digest = self._find_bad_digest(digester.digests) + if bad_digest is not None: + msg = "%s %s has bad %s digest: expected %s, got %s" % \ + (self.distfile, self._current_mirror.name, bad_digest, + self.digests[bad_digest], digester.digests[bad_digest]) + self.scheduler.output(msg + '\n', background=True, + log_path=self._log_path) + try: + os.unlink(self._fetch_tmp_file) + except OSError: + pass + else: + dest = os.path.join(self.config.options.distfiles, self.distfile) + try: + os.rename(self._fetch_tmp_file, dest) + except OSError: + self._start_task( + FileCopier(src_path=self._fetch_tmp_file, + dest_path=dest, + background=(self.background and + self._log_path is not None), + logfile=self._log_path), + self._fetch_copier_exit) + return + else: + self._success() + self.returncode = os.EX_OK + self.wait() + return + + self._try_next_mirror() + + def _fetch_copier_exit(self, copier): + + self._assert_current(copier) + + try: + os.unlink(self._fetch_tmp_file) + except OSError: + pass + + if self._was_cancelled(): + self.wait() + return + + if copier.returncode == os.EX_OK: + self._success() + self.returncode = os.EX_OK + self.wait() + else: + # out of space? + msg = "%s %s copy failed unexpectedly" % \ + (self.distfile, self._fetch_tmp_dir_info) + self.scheduler.output(msg + '\n', background=True, + log_path=self._log_path) + logging.error(msg) + self.config.log_failure("%s\t%s\t%s" % + (self.cpv, self.distfile, msg)) + self.config.file_failures[self.distfile] = self.cpv + self.returncode = 1 + self.wait() + + def _unlink_file(self, file_path, dir_info): + try: + os.unlink(file_path) + except OSError as e: + if e.errno not in (errno.ENOENT, errno.ESTALE): + msg = "unlink '%s' failed in %s: %s" % \ + (self.distfile, dir_info, e) + self.scheduler.output(msg + '\n', background=True, + log_path=self._log_path) + logging.error(msg) + return False + return True + + def _have_needed_digests(self): + return "size" in self.digests and \ + self._select_hash() is not None + + def _select_hash(self): + if default_hash_name in self.digests: + return default_hash_name + else: + for hash_name in self.digests: + if hash_name != "size" and \ + hash_name in portage.checksum.hashfunc_map: + return hash_name + + return None + + def _find_bad_digest(self, digests): + for hash_name, hash_value in digests.items(): + if self.digests[hash_name] != hash_value: + return hash_name + return None + + @staticmethod + def _same_device(path1, path2): + try: + st1 = os.stat(path1) + st2 = os.stat(path2) + except OSError: + return False + else: + return st1.st_dev == st2.st_dev + + def _hardlink_atomic(self, src, dest, dir_info): + + head, tail = os.path.split(dest) + hardlink_tmp = os.path.join(head, ".%s._mirrordist_hardlink_.%s" % \ + (tail, os.getpid())) + + try: + try: + os.link(src, hardlink_tmp) + except OSError as e: + if e.errno != errno.EXDEV: + msg = "hardlink %s from %s failed: %s" % \ + (self.distfile, dir_info, e) + self.scheduler.output(msg + '\n', background=True, + log_path=self._log_path) + logging.error(msg) + return False + + try: + os.rename(hardlink_tmp, dest) + except OSError as e: + msg = "hardlink rename '%s' from %s failed: %s" % \ + (self.distfile, dir_info, e) + self.scheduler.output(msg + '\n', background=True, + log_path=self._log_path) + logging.error(msg) + return False + finally: + try: + os.unlink(hardlink_tmp) + except OSError: + pass + + return True diff --git a/pym/portage/_emirrordist/MirrorDistTask.py b/pym/portage/_emirrordist/MirrorDistTask.py new file mode 100644 index 000000000..b6f875d4b --- /dev/null +++ b/pym/portage/_emirrordist/MirrorDistTask.py @@ -0,0 +1,218 @@ +# Copyright 2013 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import errno +import logging +import sys +import time + +try: + import threading +except ImportError: + import dummy_threading as threading + +import portage +from portage import os +from portage.util._async.TaskScheduler import TaskScheduler +from _emerge.CompositeTask import CompositeTask +from .FetchIterator import FetchIterator +from .DeletionIterator import DeletionIterator + +if sys.hexversion >= 0x3000000: + long = int + +class MirrorDistTask(CompositeTask): + + __slots__ = ('_config', '_terminated', '_term_check_id') + + def __init__(self, config): + CompositeTask.__init__(self, scheduler=config.event_loop) + self._config = config + self._terminated = threading.Event() + + def _start(self): + self._term_check_id = self.scheduler.idle_add(self._termination_check) + fetch = TaskScheduler(iter(FetchIterator(self._config)), + max_jobs=self._config.options.jobs, + max_load=self._config.options.load_average, + event_loop=self._config.event_loop) + self._start_task(fetch, self._fetch_exit) + + def _fetch_exit(self, fetch): + + self._assert_current(fetch) + if self._was_cancelled(): + self.wait() + return + + if self._config.options.delete: + deletion = TaskScheduler(iter(DeletionIterator(self._config)), + max_jobs=self._config.options.jobs, + max_load=self._config.options.load_average, + event_loop=self._config.event_loop) + self._start_task(deletion, self._deletion_exit) + return + + self._post_deletion() + + def _deletion_exit(self, deletion): + + self._assert_current(deletion) + if self._was_cancelled(): + self.wait() + return + + self._post_deletion() + + def _post_deletion(self): + + if self._config.options.recycle_db is not None: + self._update_recycle_db() + + if self._config.options.scheduled_deletion_log is not None: + self._scheduled_deletion_log() + + self._summary() + + self.returncode = os.EX_OK + self._current_task = None + self.wait() + + def _update_recycle_db(self): + + start_time = self._config.start_time + recycle_dir = self._config.options.recycle_dir + recycle_db = self._config.recycle_db + r_deletion_delay = self._config.options.recycle_deletion_delay + + # Use a dict optimize access. + recycle_db_cache = dict(recycle_db.items()) + + for filename in os.listdir(recycle_dir): + + recycle_file = os.path.join(recycle_dir, filename) + + try: + st = os.stat(recycle_file) + except OSError as e: + if e.errno not in (errno.ENOENT, errno.ESTALE): + logging.error(("stat failed for '%s' in " + "recycle: %s") % (filename, e)) + continue + + value = recycle_db_cache.pop(filename, None) + if value is None: + logging.debug(("add '%s' to " + "recycle db") % filename) + recycle_db[filename] = (st.st_size, start_time) + else: + r_size, r_time = value + if long(r_size) != st.st_size: + recycle_db[filename] = (st.st_size, start_time) + elif r_time + r_deletion_delay < start_time: + if self._config.options.dry_run: + logging.info(("dry-run: delete '%s' from " + "recycle") % filename) + logging.info(("drop '%s' from " + "recycle db") % filename) + else: + try: + os.unlink(recycle_file) + except OSError as e: + if e.errno not in (errno.ENOENT, errno.ESTALE): + logging.error(("delete '%s' from " + "recycle failed: %s") % (filename, e)) + else: + logging.debug(("delete '%s' from " + "recycle") % filename) + try: + del recycle_db[filename] + except KeyError: + pass + else: + logging.debug(("drop '%s' from " + "recycle db") % filename) + + # Existing files were popped from recycle_db_cache, + # so any remaining entries are for files that no + # longer exist. + for filename in recycle_db_cache: + try: + del recycle_db[filename] + except KeyError: + pass + else: + logging.debug(("drop non-existent '%s' from " + "recycle db") % filename) + + def _scheduled_deletion_log(self): + + start_time = self._config.start_time + dry_run = self._config.options.dry_run + deletion_delay = self._config.options.deletion_delay + distfiles_db = self._config.distfiles_db + + date_map = {} + for filename, timestamp in self._config.deletion_db.items(): + date = timestamp + deletion_delay + if date < start_time: + date = start_time + date = time.strftime("%Y-%m-%d", time.gmtime(date)) + date_files = date_map.get(date) + if date_files is None: + date_files = [] + date_map[date] = date_files + date_files.append(filename) + + if dry_run: + logging.warn(("dry-run: scheduled-deletions log " + "will be summarized via logging.info")) + + lines = [] + for date in sorted(date_map): + date_files = date_map[date] + if dry_run: + logging.info(("dry-run: scheduled deletions for %s: %s files") % + (date, len(date_files))) + lines.append("%s\n" % date) + for filename in date_files: + cpv = "unknown" + if distfiles_db is not None: + cpv = distfiles_db.get(filename, cpv) + lines.append("\t%s\t%s\n" % (filename, cpv)) + + if not dry_run: + portage.util.write_atomic( + self._config.options.scheduled_deletion_log, + "".join(lines)) + + def _summary(self): + elapsed_time = time.time() - self._config.start_time + fail_count = len(self._config.file_failures) + delete_count = self._config.delete_count + scheduled_deletion_count = self._config.scheduled_deletion_count - delete_count + added_file_count = self._config.added_file_count + added_byte_count = self._config.added_byte_count + + logging.info("finished in %i seconds" % elapsed_time) + logging.info("failed to fetch %i files" % fail_count) + logging.info("deleted %i files" % delete_count) + logging.info("deletion of %i files scheduled" % + scheduled_deletion_count) + logging.info("added %i files" % added_file_count) + logging.info("added %i bytes total" % added_byte_count) + + def terminate(self): + self._terminated.set() + + def _termination_check(self): + if self._terminated.is_set(): + self.cancel() + self.wait() + return True + + def _wait(self): + CompositeTask._wait(self) + if self._term_check_id is not None: + self.scheduler.source_remove(self._term_check_id) + self._term_check_id = None diff --git a/pym/portage/_emirrordist/__init__.py b/pym/portage/_emirrordist/__init__.py new file mode 100644 index 000000000..6cde9320b --- /dev/null +++ b/pym/portage/_emirrordist/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2013 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 diff --git a/pym/portage/_emirrordist/main.py b/pym/portage/_emirrordist/main.py new file mode 100644 index 000000000..6b7c22ff1 --- /dev/null +++ b/pym/portage/_emirrordist/main.py @@ -0,0 +1,438 @@ +# Copyright 2013 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import logging +import optparse +import sys + +import portage +from portage import os +from portage.util import normalize_path +from portage.util._async.run_main_scheduler import run_main_scheduler +from portage.util._async.SchedulerInterface import SchedulerInterface +from portage.util._eventloop.global_event_loop import global_event_loop +from portage._emirrordist.Config import Config +from .Config import Config +from .MirrorDistTask import MirrorDistTask + +if sys.hexversion >= 0x3000000: + long = int + +seconds_per_day = 24 * 60 * 60 + +common_options = ( + { + "longopt" : "--dry-run", + "help" : "perform a trial run with no changes made (usually combined " + "with --verbose)", + "action" : "store_true" + }, + { + "longopt" : "--verbose", + "shortopt" : "-v", + "help" : "display extra information on stderr " + "(multiple occurences increase verbosity)", + "action" : "count", + "default" : 0, + }, + { + "longopt" : "--ignore-default-opts", + "help" : "do not use the EMIRRORDIST_DEFAULT_OPTS environment variable", + "action" : "store_true" + }, + { + "longopt" : "--distfiles", + "help" : "distfiles directory to use (required)", + "metavar" : "DIR" + }, + { + "longopt" : "--jobs", + "shortopt" : "-j", + "help" : "number of concurrent jobs to run" + }, + { + "longopt" : "--load-average", + "shortopt" : "-l", + "help" : "load average limit for spawning of new concurrent jobs", + "metavar" : "LOAD" + }, + { + "longopt" : "--tries", + "help" : "maximum number of tries per file, 0 means unlimited (default is 10)", + "default" : 10 + }, + { + "longopt" : "--repo", + "help" : "name of repo to operate on (default repo is located at $PORTDIR)" + }, + { + "longopt" : "--config-root", + "help" : "location of portage config files", + "metavar" : "DIR" + }, + { + "longopt" : "--portdir", + "help" : "override the portage tree location", + "metavar" : "DIR" + }, + { + "longopt" : "--portdir-overlay", + "help" : "override the PORTDIR_OVERLAY variable (requires " + "that --repo is also specified)" + }, + { + "longopt" : "--strict-manifests", + "help" : "manually override \"strict\" FEATURES setting", + "choices" : ("y", "n"), + "metavar" : "", + "type" : "choice" + }, + { + "longopt" : "--failure-log", + "help" : "log file for fetch failures, with tab-delimited " + "output, for reporting purposes", + "metavar" : "FILE" + }, + { + "longopt" : "--success-log", + "help" : "log file for fetch successes, with tab-delimited " + "output, for reporting purposes", + "metavar" : "FILE" + }, + { + "longopt" : "--scheduled-deletion-log", + "help" : "log file for scheduled deletions, with tab-delimited " + "output, for reporting purposes", + "metavar" : "FILE" + }, + { + "longopt" : "--delete", + "help" : "enable deletion of unused distfiles", + "action" : "store_true" + }, + { + "longopt" : "--deletion-db", + "help" : "database file used to track lifetime of files " + "scheduled for delayed deletion", + "metavar" : "FILE" + }, + { + "longopt" : "--deletion-delay", + "help" : "delay time for deletion, measured in seconds", + "metavar" : "SECONDS" + }, + { + "longopt" : "--temp-dir", + "help" : "temporary directory for downloads", + "metavar" : "DIR" + }, + { + "longopt" : "--mirror-overrides", + "help" : "file holding a list of mirror overrides", + "metavar" : "FILE" + }, + { + "longopt" : "--mirror-skip", + "help" : "comma delimited list of mirror targets to skip " + "when fetching" + }, + { + "longopt" : "--restrict-mirror-exemptions", + "help" : "comma delimited list of mirror targets for which to " + "ignore RESTRICT=\"mirror\"" + }, + { + "longopt" : "--verify-existing-digest", + "help" : "use digest as a verification of whether existing " + "distfiles are valid", + "action" : "store_true" + }, + { + "longopt" : "--distfiles-local", + "help" : "distfiles-local directory to use", + "metavar" : "DIR" + }, + { + "longopt" : "--distfiles-db", + "help" : "database file used to track which ebuilds a " + "distfile belongs to", + "metavar" : "FILE" + }, + { + "longopt" : "--recycle-dir", + "help" : "directory for extended retention of files that " + "are removed from distdir with the --delete option", + "metavar" : "DIR" + }, + { + "longopt" : "--recycle-db", + "help" : "database file used to track lifetime of files " + "in recycle dir", + "metavar" : "FILE" + }, + { + "longopt" : "--recycle-deletion-delay", + "help" : "delay time for deletion of unused files from " + "recycle dir, measured in seconds (defaults to " + "the equivalent of 60 days)", + "default" : 60 * seconds_per_day, + "metavar" : "SECONDS" + }, + { + "longopt" : "--fetch-log-dir", + "help" : "directory for individual fetch logs", + "metavar" : "DIR" + }, + { + "longopt" : "--whitelist-from", + "help" : "specifies a file containing a list of files to " + "whitelist, one per line, # prefixed lines ignored", + "action" : "append", + "metavar" : "FILE" + }, +) + +def parse_args(args): + description = "emirrordist - a fetch tool for mirroring " \ + "of package distfiles" + usage = "emirrordist [options] " + parser = optparse.OptionParser(description=description, usage=usage) + + actions = optparse.OptionGroup(parser, 'Actions') + actions.add_option("--version", + action="store_true", + help="display portage version and exit") + actions.add_option("--mirror", + action="store_true", + help="mirror distfiles for the selected repository") + parser.add_option_group(actions) + + common = optparse.OptionGroup(parser, 'Common options') + for opt_info in common_options: + opt_pargs = [opt_info["longopt"]] + if opt_info.get("shortopt"): + opt_pargs.append(opt_info["shortopt"]) + opt_kwargs = {"help" : opt_info["help"]} + for k in ("action", "choices", "default", "metavar", "type"): + if k in opt_info: + opt_kwargs[k] = opt_info[k] + common.add_option(*opt_pargs, **opt_kwargs) + parser.add_option_group(common) + + options, args = parser.parse_args(args) + + return (parser, options, args) + +def emirrordist_main(args): + + parser, options, args = parse_args(args) + + if options.version: + sys.stdout.write("Portage %s\n" % portage.VERSION) + return os.EX_OK + + config_root = options.config_root + + # The calling environment is ignored, so the program is + # completely controlled by commandline arguments. + env = {} + + if options.repo is None: + env['PORTDIR_OVERLAY'] = '' + elif options.portdir_overlay: + env['PORTDIR_OVERLAY'] = options.portdir_overlay + + if options.portdir is not None: + env['PORTDIR'] = options.portdir + + settings = portage.config(config_root=config_root, + local_config=False, env=env) + + default_opts = None + if not options.ignore_default_opts: + default_opts = settings.get('EMIRRORDIST_DEFAULT_OPTS', '').split() + + if default_opts: + parser, options, args = parse_args(default_opts + args) + + settings = portage.config(config_root=config_root, + local_config=False, env=env) + + repo_path = None + if options.repo is not None: + repo_path = settings.repositories.treemap.get(options.repo) + if repo_path is None: + parser.error("Unable to locate repository named '%s'" % \ + (options.repo,)) + else: + repo_path = settings.repositories.mainRepoLocation() + if not repo_path: + parser.error("PORTDIR is undefined") + + if options.jobs is not None: + options.jobs = int(options.jobs) + + if options.load_average is not None: + options.load_average = float(options.load_average) + + if options.failure_log is not None: + options.failure_log = normalize_path( + os.path.abspath(options.failure_log)) + + parent_dir = os.path.dirname(options.failure_log) + if not (os.path.isdir(parent_dir) and + os.access(parent_dir, os.W_OK|os.X_OK)): + parser.error(("--failure-log '%s' parent is not a " + "writable directory") % options.failure_log) + + if options.success_log is not None: + options.success_log = normalize_path( + os.path.abspath(options.success_log)) + + parent_dir = os.path.dirname(options.success_log) + if not (os.path.isdir(parent_dir) and + os.access(parent_dir, os.W_OK|os.X_OK)): + parser.error(("--success-log '%s' parent is not a " + "writable directory") % options.success_log) + + if options.scheduled_deletion_log is not None: + options.scheduled_deletion_log = normalize_path( + os.path.abspath(options.scheduled_deletion_log)) + + parent_dir = os.path.dirname(options.scheduled_deletion_log) + if not (os.path.isdir(parent_dir) and + os.access(parent_dir, os.W_OK|os.X_OK)): + parser.error(("--scheduled-deletion-log '%s' parent is not a " + "writable directory") % options.scheduled_deletion_log) + + if options.deletion_db is None: + parser.error("--scheduled-deletion-log requires --deletion-db") + + if options.deletion_delay is not None: + options.deletion_delay = long(options.deletion_delay) + if options.deletion_db is None: + parser.error("--deletion-delay requires --deletion-db") + + if options.deletion_db is not None: + if options.deletion_delay is None: + parser.error("--deletion-db requires --deletion-delay") + options.deletion_db = normalize_path( + os.path.abspath(options.deletion_db)) + + if options.temp_dir is not None: + options.temp_dir = normalize_path( + os.path.abspath(options.temp_dir)) + + if not (os.path.isdir(options.temp_dir) and + os.access(options.temp_dir, os.W_OK|os.X_OK)): + parser.error(("--temp-dir '%s' is not a " + "writable directory") % options.temp_dir) + + if options.distfiles is not None: + options.distfiles = normalize_path( + os.path.abspath(options.distfiles)) + + if not (os.path.isdir(options.distfiles) and + os.access(options.distfiles, os.W_OK|os.X_OK)): + parser.error(("--distfiles '%s' is not a " + "writable directory") % options.distfiles) + else: + parser.error("missing required --distfiles parameter") + + if options.mirror_overrides is not None: + options.mirror_overrides = normalize_path( + os.path.abspath(options.mirror_overrides)) + + if not (os.access(options.mirror_overrides, os.R_OK) and + os.path.isfile(options.mirror_overrides)): + parser.error( + "--mirror-overrides-file '%s' is not a readable file" % + options.mirror_overrides) + + if options.distfiles_local is not None: + options.distfiles_local = normalize_path( + os.path.abspath(options.distfiles_local)) + + if not (os.path.isdir(options.distfiles_local) and + os.access(options.distfiles_local, os.W_OK|os.X_OK)): + parser.error(("--distfiles-local '%s' is not a " + "writable directory") % options.distfiles_local) + + if options.distfiles_db is not None: + options.distfiles_db = normalize_path( + os.path.abspath(options.distfiles_db)) + + if options.tries is not None: + options.tries = int(options.tries) + + if options.recycle_dir is not None: + options.recycle_dir = normalize_path( + os.path.abspath(options.recycle_dir)) + if not (os.path.isdir(options.recycle_dir) and + os.access(options.recycle_dir, os.W_OK|os.X_OK)): + parser.error(("--recycle-dir '%s' is not a " + "writable directory") % options.recycle_dir) + + if options.recycle_db is not None: + if options.recycle_dir is None: + parser.error("--recycle-db requires " + "--recycle-dir to be specified") + options.recycle_db = normalize_path( + os.path.abspath(options.recycle_db)) + + if options.recycle_deletion_delay is not None: + options.recycle_deletion_delay = \ + long(options.recycle_deletion_delay) + + if options.fetch_log_dir is not None: + options.fetch_log_dir = normalize_path( + os.path.abspath(options.fetch_log_dir)) + + if not (os.path.isdir(options.fetch_log_dir) and + os.access(options.fetch_log_dir, os.W_OK|os.X_OK)): + parser.error(("--fetch-log-dir '%s' is not a " + "writable directory") % options.fetch_log_dir) + + if options.whitelist_from: + normalized_paths = [] + for x in options.whitelist_from: + path = normalize_path(os.path.abspath(x)) + normalized_paths.append(path) + if not (os.access(path, os.R_OK) and os.path.isfile(path)): + parser.error( + "--whitelist-from '%s' is not a readable file" % x) + options.whitelist_from = normalized_paths + + if options.strict_manifests is not None: + if options.strict_manifests == "y": + settings.features.add("strict") + else: + settings.features.discard("strict") + + settings.lock() + + portdb = portage.portdbapi(mysettings=settings) + + # Limit ebuilds to the specified repo. + portdb.porttrees = [repo_path] + + portage.util.initialize_logger() + + if options.verbose > 0: + l = logging.getLogger() + l.setLevel(l.getEffectiveLevel() - 10 * options.verbose) + + with Config(options, portdb, + SchedulerInterface(global_event_loop())) as config: + + if not options.mirror: + parser.error('No action specified') + + returncode = os.EX_OK + + if options.mirror: + signum = run_main_scheduler(MirrorDistTask(config)) + if signum is not None: + sys.exit(128 + signum) + + return returncode diff --git a/pym/portage/util/_ShelveUnicodeWrapper.py b/pym/portage/util/_ShelveUnicodeWrapper.py new file mode 100644 index 000000000..adbd5199f --- /dev/null +++ b/pym/portage/util/_ShelveUnicodeWrapper.py @@ -0,0 +1,45 @@ +# Copyright 2013 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +class ShelveUnicodeWrapper(object): + """ + Convert unicode to str and back again, since python-2.x shelve + module doesn't support unicode. + """ + def __init__(self, shelve_instance): + self._shelve = shelve_instance + + def _encode(self, s): + if isinstance(s, unicode): + s = s.encode('utf_8') + return s + + def __len__(self): + return len(self._shelve) + + def __contains__(self, k): + return self._encode(k) in self._shelve + + def __iter__(self): + return self._shelve.__iter__() + + def items(self): + return self._shelve.iteritems() + + def __setitem__(self, k, v): + self._shelve[self._encode(k)] = self._encode(v) + + def __getitem__(self, k): + return self._shelve[self._encode(k)] + + def __delitem__(self, k): + del self._shelve[self._encode(k)] + + def get(self, k, *args): + return self._shelve.get(self._encode(k), *args) + + def close(self): + self._shelve.close() + + def clear(self): + self._shelve.clear() diff --git a/pym/portage/util/_async/FileCopier.py b/pym/portage/util/_async/FileCopier.py new file mode 100644 index 000000000..27e5ab4c0 --- /dev/null +++ b/pym/portage/util/_async/FileCopier.py @@ -0,0 +1,17 @@ +# Copyright 2013 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +from portage import os +from portage import shutil +from portage.util._async.ForkProcess import ForkProcess + +class FileCopier(ForkProcess): + """ + Asynchronously copy a file. + """ + + __slots__ = ('src_path', 'dest_path') + + def _run(self): + shutil.copy(self.src_path, self.dest_path) + return os.EX_OK -- cgit v1.2.3-1-g7c22