summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2013-01-09 06:38:16 -0800
committerZac Medico <zmedico@gentoo.org>2013-01-09 09:09:44 -0800
commit12c795d81251b03f3d33d02696dc8db1299dee78 (patch)
tree9e3a3983bb2fb65064f5b1a056ac4c36280a9e5d
parent97e8df7eaa171d68da6b3e111c419e2a16dfaf4c (diff)
downloadportage-12c795d81251b03f3d33d02696dc8db1299dee78.tar.gz
portage-12c795d81251b03f3d33d02696dc8db1299dee78.tar.bz2
portage-12c795d81251b03f3d33d02696dc8db1299dee78.zip
Add emirrordist, a tool for mirroring distfiles.
Special thanks to Brian Harring, author of the mirror-dist program from which emirrordist is derived.
-rwxr-xr-xbin/emirrordist13
-rw-r--r--man/emirrordist.1143
-rw-r--r--pym/portage/_emirrordist/Config.py132
-rw-r--r--pym/portage/_emirrordist/DeletionIterator.py83
-rw-r--r--pym/portage/_emirrordist/DeletionTask.py129
-rw-r--r--pym/portage/_emirrordist/FetchIterator.py132
-rw-r--r--pym/portage/_emirrordist/FetchTask.py615
-rw-r--r--pym/portage/_emirrordist/MirrorDistTask.py218
-rw-r--r--pym/portage/_emirrordist/__init__.py2
-rw-r--r--pym/portage/_emirrordist/main.py438
-rw-r--r--pym/portage/util/_ShelveUnicodeWrapper.py45
-rw-r--r--pym/portage/util/_async/FileCopier.py17
12 files changed, 1967 insertions, 0 deletions
diff --git a/bin/emirrordist b/bin/emirrordist
new file mode 100755
index 000000000..8d93de9c0
--- /dev/null
+++ b/bin/emirrordist
@@ -0,0 +1,13 @@
+#!/usr/bin/python
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import sys
+
+import portage
+portage._internal_caller = True
+portage._disable_legacy_globals()
+from portage._emirrordist.main import emirrordist_main
+
+if __name__ == "__main__":
+ sys.exit(emirrordist_main(sys.argv[1:]))
diff --git a/man/emirrordist.1 b/man/emirrordist.1
new file mode 100644
index 000000000..6d020770e
--- /dev/null
+++ b/man/emirrordist.1
@@ -0,0 +1,143 @@
+.TH "EMIRRORDIST" "1" "Jan 2013" "Portage VERSION" "Portage"
+.SH "NAME"
+emirrordist \- a fetch tool for mirroring of package distfiles
+.SH SYNOPSIS
+.B emirrordist
+[\fIoptions\fR] \fI<action>\fR
+.SH ACTIONS
+.TP
+\fB\-h\fR, \fB\-\-help\fR
+Show a help message and exit.
+.TP
+\fB\-\-version\fR
+Display portage version and exit.
+.TP
+\fB\-\-mirror\fR
+Mirror distfiles for the selected repository.
+.SH OPTIONS
+.TP
+\fB\-\-dry\-run\fR
+Perform a trial run with no changes made (typically combined
+with \fI\-v\fR or \fI\-vv\fR).
+.TP
+\fB\-v\fR, \fB\-\-verbose\fR
+Display extra information on stderr (multiple occurences
+increase verbosity).
+.TP
+\fB\-\-ignore\-default\-opts\fR
+Do not use the \fIEMIRRORDIST_DEFAULT_OPTS\fR environment
+variable.
+.TP
+\fB\-\-distfiles\fR=\fIDIR\fR
+Distfiles directory to use (required).
+.TP
+\fB\-j\fR JOBS, \fB\-\-jobs\fR=\fIJOBS\fR
+Number of concurrent jobs to run.
+.TP
+\fB\-l\fR LOAD, \fB\-\-load\-average\fR=\fILOAD\fR
+Load average limit for spawning of new concurrent jobs.
+.TP
+\fB\-\-tries\fR=\fITRIES\fR
+Maximum number of tries per file, 0 means unlimited
+(default is 10).
+.TP
+\fB\-\-repo\fR=\fIREPO\fR
+Name of repo to operate on (default repo is located at
+$PORTDIR).
+.TP
+\fB\-\-config\-root\fR=\fIDIR\fR
+Location of portage config files.
+.TP
+\fB\-\-portdir\fR=\fIDIR\fR
+Override the portage tree location.
+.TP
+\fB\-\-portdir\-overlay\fR=\fIPORTDIR_OVERLAY\fR
+Override the PORTDIR_OVERLAY variable (requires that
+\fI\-\-repo\fR is also specified).
+.TP
+\fB\-\-strict\-manifests=\fR<y|n>
+Manually override "strict" FEATURES setting.
+.TP
+\fB\-\-failure\-log\fR=\fIFILE\fR
+Log file for fetch failures, with tab\-delimited output, for
+reporting purposes. Opened in append mode.
+.TP
+\fB\-\-success\-log\fR=\fIFILE\fR
+Log file for fetch successes, with tab\-delimited output, for
+reporting purposes. Opened in append mode.
+.TP
+\fB\-\-scheduled\-deletion\-log\fR=\fIFILE\fR
+Log file for scheduled deletions, with tab\-delimited output, for
+reporting purposes. Overwritten with each run.
+.TP
+\fB\-\-delete\fR
+Enable deletion of unused distfiles.
+.TP
+\fB\-\-deletion\-db\fR=\fIFILE\fR
+Database file used to track lifetime of files scheduled for
+delayed deletion.
+.TP
+\fB\-\-deletion\-delay\fR=\fISECONDS\fR
+Delay time for deletion of unused distfiles, measured in seconds.
+.TP
+\fB\-\-temp\-dir\fR=\fIDIR\fR
+Temporary directory for downloads.
+.TP
+\fB\-\-mirror\-overrides\fR=\fIFILE\fR
+File holding a list of mirror overrides.
+.TP
+\fB\-\-mirror\-skip\fR=\fIMIRROR_SKIP\fR
+Comma delimited list of mirror targets to skip when
+fetching.
+.TP
+\fB\-\-restrict\-mirror\-exemptions\fR=\fIRESTRICT_MIRROR_EXEMPTIONS\fR
+Comma delimited list of mirror targets for which to ignore
+RESTRICT="mirror" (see \fBebuild\fR(5)).
+.TP
+\fB\-\-verify\-existing\-digest\fR
+Use digest as a verification of whether existing
+distfiles are valid.
+.TP
+\fB\-\-distfiles\-local\fR=\fIDIR\fR
+The distfiles\-local directory to use.
+.TP
+\fB\-\-distfiles\-db\fR=\fIFILE\fR
+Database file used to track which ebuilds a distfile belongs to.
+.TP
+\fB\-\-recycle\-dir\fR=\fIDIR\fR
+Directory for extended retention of files that are removed from
+distdir with the \-\-delete option. These files may be be recycled if
+they are needed again, instead of downloading them again.
+.TP
+\fB\-\-recycle\-db\fR=\fIFILE\fR
+Database file used to track lifetime of files in recycle dir.
+.TP
+\fB\-\-recycle\-deletion\-delay\fR=\fISECONDS\fR
+Delay time for deletion of unused files from recycle dir,
+measured in seconds (defaults to the equivalent of 60 days).
+.TP
+\fB\-\-fetch\-log\-dir\fR=\fIDIR\fR
+Directory for individual fetch logs.
+.TP
+\fB\-\-whitelist\-from\fR=\fIFILE\fR
+Specifies a file containing a list of files to whitelist, one per line,
+# prefixed lines ignored. Use this option multiple times in order to
+specify multiple whitelists.
+.SH "REPORTING BUGS"
+Please report bugs via http://bugs.gentoo.org/
+.SH "THANKS"
+Special thanks to Brian Harring, author of the mirror\-dist program from
+which emirrordist is derived.
+.SH "AUTHORS"
+.nf
+Zac Medico <zmedico@gentoo.org>
+.fi
+.SH "FILES"
+.TP
+.B /etc/portage/make.conf
+Contains variables.
+.SH "SEE ALSO"
+.BR ebuild (5),
+.BR egencache (1),
+.BR make.conf (5),
+.BR portage (5)
diff --git a/pym/portage/_emirrordist/Config.py b/pym/portage/_emirrordist/Config.py
new file mode 100644
index 000000000..db4bfebd4
--- /dev/null
+++ b/pym/portage/_emirrordist/Config.py
@@ -0,0 +1,132 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import copy
+import io
+import logging
+import shelve
+import sys
+import time
+
+import portage
+from portage import os
+from portage.util import grabdict, grablines
+from portage.util._ShelveUnicodeWrapper import ShelveUnicodeWrapper
+
+class Config(object):
+ def __init__(self, options, portdb, event_loop):
+ self.options = options
+ self.portdb = portdb
+ self.event_loop = event_loop
+ self.added_byte_count = 0
+ self.added_file_count = 0
+ self.scheduled_deletion_count = 0
+ self.delete_count = 0
+ self.file_owners = {}
+ self.file_failures = {}
+ self.start_time = time.time()
+ self._open_files = []
+
+ self.log_success = self._open_log('success', options.success_log, 'a')
+ self.log_failure = self._open_log('failure', options.failure_log, 'a')
+
+ self.distfiles = None
+ if options.distfiles is not None:
+ self.distfiles = options.distfiles
+
+ self.mirrors = copy.copy(portdb.settings.thirdpartymirrors())
+
+ if options.mirror_overrides is not None:
+ self.mirrors.update(grabdict(options.mirror_overrides))
+
+ if options.mirror_skip is not None:
+ for x in options.mirror_skip.split(","):
+ self.mirrors[x] = []
+
+ self.whitelist = None
+ if options.whitelist_from is not None:
+ self.whitelist = set()
+ for filename in options.whitelist_from:
+ for line in grablines(filename):
+ line = line.strip()
+ if line and not line.startswith("#"):
+ self.whitelist.add(line)
+
+ self.restrict_mirror_exemptions = None
+ if options.restrict_mirror_exemptions is not None:
+ self.restrict_mirror_exemptions = frozenset(
+ options.restrict_mirror_exemptions.split(","))
+
+ self.recycle_db = None
+ if options.recycle_db is not None:
+ self.recycle_db = self._open_shelve(
+ options.recycle_db, 'recycle')
+
+ self.distfiles_db = None
+ if options.distfiles_db is not None:
+ self.distfiles_db = self._open_shelve(
+ options.distfiles_db, 'distfiles')
+
+ self.deletion_db = None
+ if options.deletion_db is not None:
+ self.deletion_db = self._open_shelve(
+ options.deletion_db, 'deletion')
+
+ def _open_log(self, log_desc, log_path, mode):
+
+ if log_path is None or self.options.dry_run:
+ log_func = logging.info
+ line_format = "%s: %%s" % log_desc
+ add_newline = False
+ if log_path is not None:
+ logging.warn(("dry-run: %s log "
+ "redirected to logging.info") % log_desc)
+ else:
+ self._open_files.append(io.open(log_path, mode=mode,
+ encoding='utf_8'))
+ line_format = "%s\n"
+ log_func = self._open_files[-1].write
+
+ return self._LogFormatter(line_format, log_func)
+
+ class _LogFormatter(object):
+
+ __slots__ = ('_line_format', '_log_func')
+
+ def __init__(self, line_format, log_func):
+ self._line_format = line_format
+ self._log_func = log_func
+
+ def __call__(self, msg):
+ self._log_func(self._line_format % (msg,))
+
+ def _open_shelve(self, db_file, db_desc):
+ if self.options.dry_run:
+ open_flag = "r"
+ else:
+ open_flag = "c"
+
+ if self.options.dry_run and not os.path.exists(db_file):
+ db = {}
+ else:
+ db = shelve.open(db_file, flag=open_flag)
+ if sys.hexversion < 0x3000000:
+ db = ShelveUnicodeWrapper(db)
+
+ if self.options.dry_run:
+ logging.warn("dry-run: %s db opened in readonly mode" % db_desc)
+ if not isinstance(db, dict):
+ volatile_db = dict((k, db[k]) for k in db)
+ db.close()
+ db = volatile_db
+ else:
+ self._open_files.append(db)
+
+ return db
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_value, traceback):
+ while self._open_files:
+ self._open_files.pop().close()
diff --git a/pym/portage/_emirrordist/DeletionIterator.py b/pym/portage/_emirrordist/DeletionIterator.py
new file mode 100644
index 000000000..dff52c042
--- /dev/null
+++ b/pym/portage/_emirrordist/DeletionIterator.py
@@ -0,0 +1,83 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import logging
+import stat
+
+from portage import os
+from .DeletionTask import DeletionTask
+
+class DeletionIterator(object):
+
+ def __init__(self, config):
+ self._config = config
+
+ def __iter__(self):
+ distdir = self._config.options.distfiles
+ file_owners = self._config.file_owners
+ whitelist = self._config.whitelist
+ distfiles_local = self._config.options.distfiles_local
+ deletion_db = self._config.deletion_db
+ deletion_delay = self._config.options.deletion_delay
+ start_time = self._config.start_time
+ distfiles_set = set(os.listdir(self._config.options.distfiles))
+ for filename in distfiles_set:
+ try:
+ st = os.stat(os.path.join(distdir, filename))
+ except OSError as e:
+ logging.error("stat failed on '%s' in distfiles: %s\n" %
+ (filename, e))
+ continue
+ if not stat.S_ISREG(st.st_mode):
+ continue
+ elif filename in file_owners:
+ if deletion_db is not None:
+ try:
+ del deletion_db[filename]
+ except KeyError:
+ pass
+ elif whitelist is not None and filename in whitelist:
+ if deletion_db is not None:
+ try:
+ del deletion_db[filename]
+ except KeyError:
+ pass
+ elif distfiles_local is not None and \
+ os.path.exists(os.path.join(distfiles_local, filename)):
+ if deletion_db is not None:
+ try:
+ del deletion_db[filename]
+ except KeyError:
+ pass
+ else:
+ self._config.scheduled_deletion_count += 1
+
+ if deletion_db is None or deletion_delay is None:
+
+ yield DeletionTask(background=True,
+ distfile=filename,
+ config=self._config)
+
+ else:
+ deletion_entry = deletion_db.get(filename)
+
+ if deletion_entry is None:
+ logging.debug("add '%s' to deletion db" % filename)
+ deletion_db[filename] = start_time
+
+ elif deletion_entry + deletion_delay <= start_time:
+
+ yield DeletionTask(background=True,
+ distfile=filename,
+ config=self._config)
+
+ if deletion_db is not None:
+ for filename in list(deletion_db):
+ if filename not in distfiles_set:
+ try:
+ del deletion_db[filename]
+ except KeyError:
+ pass
+ else:
+ logging.debug("drop '%s' from deletion db" %
+ filename)
diff --git a/pym/portage/_emirrordist/DeletionTask.py b/pym/portage/_emirrordist/DeletionTask.py
new file mode 100644
index 000000000..7d10957fa
--- /dev/null
+++ b/pym/portage/_emirrordist/DeletionTask.py
@@ -0,0 +1,129 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import errno
+import logging
+
+from portage import os
+from portage.util._async.FileCopier import FileCopier
+from _emerge.CompositeTask import CompositeTask
+
+class DeletionTask(CompositeTask):
+
+ __slots__ = ('distfile', 'config')
+
+ def _start(self):
+
+ distfile_path = os.path.join(
+ self.config.options.distfiles, self.distfile)
+
+ if self.config.options.recycle_dir is not None:
+ distfile_path = os.path.join(self.config.options.distfiles, self.distfile)
+ recycle_path = os.path.join(
+ self.config.options.recycle_dir, self.distfile)
+ if self.config.options.dry_run:
+ logging.info(("dry-run: move '%s' from "
+ "distfiles to recycle") % self.distfile)
+ else:
+ logging.debug(("move '%s' from "
+ "distfiles to recycle") % self.distfile)
+ try:
+ os.rename(distfile_path, recycle_path)
+ except OSError as e:
+ if e.errno != errno.EXDEV:
+ logging.error(("rename %s from distfiles to "
+ "recycle failed: %s") % (self.distfile, e))
+ else:
+ self.returncode = os.EX_OK
+ self._async_wait()
+ return
+
+ self._start_task(
+ FileCopier(src_path=distfile_path,
+ dest_path=recycle_path,
+ background=False),
+ self._recycle_copier_exit)
+ return
+
+ success = True
+
+ if self.config.options.dry_run:
+ logging.info(("dry-run: delete '%s' from "
+ "distfiles") % self.distfile)
+ else:
+ logging.debug(("delete '%s' from "
+ "distfiles") % self.distfile)
+ try:
+ os.unlink(distfile_path)
+ except OSError as e:
+ if e.errno not in (errno.ENOENT, errno.ESTALE):
+ logging.error("%s unlink failed in distfiles: %s" %
+ (self.distfile, e))
+ success = False
+
+ if success:
+ self._success()
+ self.returncode = os.EX_OK
+ else:
+ self.returncode = 1
+
+ self._async_wait()
+
+ def _recycle_copier_exit(self, copier):
+
+ self._assert_current(copier)
+ if self._was_cancelled():
+ self.wait()
+ return
+
+ success = True
+ if copier.returncode == os.EX_OK:
+
+ try:
+ os.unlink(copier.src_path)
+ except OSError as e:
+ if e.errno not in (errno.ENOENT, errno.ESTALE):
+ logging.error("%s unlink failed in distfiles: %s" %
+ (self.distfile, e))
+ success = False
+
+ else:
+ logging.error(("%s copy from distfiles "
+ "to recycle failed: %s") % (self.distfile, e))
+ success = False
+
+ if success:
+ self._success()
+ self.returncode = os.EX_OK
+ else:
+ self.returncode = 1
+
+ self._current_task = None
+ self.wait()
+
+ def _success(self):
+
+ cpv = "unknown"
+ if self.config.distfiles_db is not None:
+ cpv = self.config.distfiles_db.get(self.distfile, cpv)
+
+ self.config.delete_count += 1
+ self.config.log_success("%s\t%s\tremoved" % (cpv, self.distfile))
+
+ if self.config.distfiles_db is not None:
+ try:
+ del self.config.distfiles_db[self.distfile]
+ except KeyError:
+ pass
+ else:
+ logging.debug(("drop '%s' from "
+ "distfiles db") % self.distfile)
+
+ if self.config.deletion_db is not None:
+ try:
+ del self.config.deletion_db[self.distfile]
+ except KeyError:
+ pass
+ else:
+ logging.debug(("drop '%s' from "
+ "deletion db") % self.distfile)
diff --git a/pym/portage/_emirrordist/FetchIterator.py b/pym/portage/_emirrordist/FetchIterator.py
new file mode 100644
index 000000000..cc5d4be00
--- /dev/null
+++ b/pym/portage/_emirrordist/FetchIterator.py
@@ -0,0 +1,132 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from portage.dep import use_reduce
+from portage.exception import PortageException
+from portage.manifest import Manifest
+from .FetchTask import FetchTask
+
+class FetchIterator(object):
+
+ def __init__(self, config):
+ self._config = config
+ self._log_failure = config.log_failure
+
+ def _iter_every_cp(self):
+ # List categories individually, in order to start yielding quicker,
+ # and in order to reduce latency in case of a signal interrupt.
+ cp_all = self._config.portdb.cp_all
+ for category in sorted(self._config.portdb.categories):
+ for cp in cp_all(categories=(category,)):
+ yield cp
+
+ def __iter__(self):
+
+ portdb = self._config.portdb
+ file_owners = self._config.file_owners
+ file_failures = self._config.file_failures
+ restrict_mirror_exemptions = self._config.restrict_mirror_exemptions
+
+ for cp in self._iter_every_cp():
+
+ for tree in portdb.porttrees:
+
+ # Reset state so the Manifest is pulled once
+ # for this cp / tree combination.
+ digests = None
+
+ for cpv in portdb.cp_list(cp, mytree=tree):
+
+ try:
+ restrict, = portdb.aux_get(cpv, ("RESTRICT",),
+ mytree=tree)
+ except (KeyError, PortageException) as e:
+ self._log_failure("%s\t\taux_get exception %s" %
+ (cpv, e))
+ continue
+
+ # Here we use matchnone=True to ignore conditional parts
+ # of RESTRICT since they don't apply unconditionally.
+ # Assume such conditionals only apply on the client side.
+ try:
+ restrict = frozenset(use_reduce(restrict,
+ flat=True, matchnone=True))
+ except PortageException as e:
+ self._log_failure("%s\t\tuse_reduce exception %s" %
+ (cpv, e))
+ continue
+
+ if "fetch" in restrict:
+ continue
+
+ try:
+ uri_map = portdb.getFetchMap(cpv)
+ except PortageException as e:
+ self._log_failure("%s\t\tgetFetchMap exception %s" %
+ (cpv, e))
+ continue
+
+ if not uri_map:
+ continue
+
+ if "mirror" in restrict:
+ skip = False
+ if restrict_mirror_exemptions is not None:
+ new_uri_map = {}
+ for filename, uri_tuple in uri_map.items():
+ for uri in uri_tuple:
+ if uri[:9] == "mirror://":
+ i = uri.find("/", 9)
+ if i != -1 and uri[9:i].strip("/") in \
+ restrict_mirror_exemptions:
+ new_uri_map[filename] = uri_tuple
+ break
+ if new_uri_map:
+ uri_map = new_uri_map
+ else:
+ skip = True
+ else:
+ skip = True
+
+ if skip:
+ continue
+
+ # Parse Manifest for this cp if we haven't yet.
+ if digests is None:
+ try:
+ digests = Manifest(os.path.join(
+ tree, cp)).getTypeDigests("DIST")
+ except (EnvironmentError, PortageException) as e:
+ for filename in uri_map:
+ self._log_failure(
+ "%s\t%s\tManifest exception %s" %
+ (cpv, filename, e))
+ file_failures[filename] = cpv
+ continue
+
+ if not digests:
+ for filename in uri_map:
+ self._log_failure("%s\t%s\tdigest entry missing" %
+ (cpv, filename))
+ file_failures[filename] = cpv
+ continue
+
+ for filename, uri_tuple in uri_map.items():
+ file_digests = digests.get(filename)
+ if file_digests is None:
+ self._log_failure("%s\t%s\tdigest entry missing" %
+ (cpv, filename))
+ file_failures[filename] = cpv
+ continue
+ if filename in file_owners:
+ continue
+ file_owners[filename] = cpv
+
+ yield FetchTask(cpv=cpv,
+ background=True,
+ digests=file_digests,
+ distfile=filename,
+ restrict=restrict,
+ uri_tuple=uri_tuple,
+ config=self._config)
diff --git a/pym/portage/_emirrordist/FetchTask.py b/pym/portage/_emirrordist/FetchTask.py
new file mode 100644
index 000000000..65fd672b9
--- /dev/null
+++ b/pym/portage/_emirrordist/FetchTask.py
@@ -0,0 +1,615 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import collections
+import errno
+import logging
+import stat
+import subprocess
+import sys
+
+import portage
+from portage import _encodings, _unicode_encode
+from portage import os
+from portage.exception import PortageException
+from portage.util._async.FileCopier import FileCopier
+from portage.util._async.FileDigester import FileDigester
+from portage.util._async.PipeLogger import PipeLogger
+from portage.util._async.PopenProcess import PopenProcess
+from _emerge.CompositeTask import CompositeTask
+
+default_hash_name = portage.const.MANIFEST2_REQUIRED_HASH
+
+default_fetchcommand = "wget -c -v -t 1 --passive-ftp --timeout=60 -O \"${DISTDIR}/${FILE}\" \"${URI}\""
+
+class FetchTask(CompositeTask):
+
+ __slots__ = ('distfile', 'digests', 'config', 'cpv',
+ 'restrict', 'uri_tuple', '_current_mirror',
+ '_current_stat', '_fetch_tmp_dir_info', '_fetch_tmp_file',
+ '_fs_mirror_stack', '_mirror_stack',
+ '_previously_added',
+ '_primaryuri_stack', '_log_path', '_tried_uris')
+
+ def _start(self):
+
+ if self.config.options.fetch_log_dir is not None and \
+ not self.config.options.dry_run:
+ self._log_path = os.path.join(
+ self.config.options.fetch_log_dir,
+ self.distfile + '.log')
+
+ self._previously_added = True
+ if self.config.distfiles_db is not None and \
+ self.distfile not in self.config.distfiles_db:
+ self._previously_added = False
+ self.config.distfiles_db[self.distfile] = self.cpv
+
+ if not self._have_needed_digests():
+ msg = "incomplete digests: %s" % " ".join(self.digests)
+ self.scheduler.output(msg, background=self.background,
+ log_path=self._log_path)
+ self.config.log_failure("%s\t%s\t%s" %
+ (self.cpv, self.distfile, msg))
+ self.config.file_failures[self.distfile] = self.cpv
+ self.returncode = os.EX_OK
+ self._async_wait()
+ return
+
+ distfile_path = os.path.join(
+ self.config.options.distfiles, self.distfile)
+
+ st = None
+ size_ok = False
+ try:
+ st = os.stat(distfile_path)
+ except OSError as e:
+ if e.errno not in (errno.ENOENT, errno.ESTALE):
+ msg = "%s stat failed in %s: %s" % \
+ (self.distfile, "distfiles", e)
+ self.scheduler.output(msg + '\n', background=True,
+ log_path=self._log_path)
+ logging.error(msg)
+ else:
+ size_ok = st.st_size == self.digests["size"]
+
+ if not size_ok:
+ if self.config.options.dry_run:
+ if st is not None:
+ logging.info(("dry-run: delete '%s' with "
+ "wrong size from distfiles") % (self.distfile,))
+ else:
+ # Do the unlink in order to ensure that the path is clear,
+ # even if stat raised ENOENT, since a broken symlink can
+ # trigger ENOENT.
+ if self._unlink_file(distfile_path, "distfiles"):
+ if st is not None:
+ logging.debug(("delete '%s' with "
+ "wrong size from distfiles") % (self.distfile,))
+ else:
+ self.config.log_failure("%s\t%s\t%s" %
+ (self.cpv, self.distfile, "unlink failed in distfiles"))
+ self.returncode = os.EX_OK
+ self._async_wait()
+ return
+
+ if size_ok:
+ if self.config.options.verify_existing_digest:
+ self._start_task(
+ FileDigester(file_path=distfile_path,
+ hash_names=(self._select_hash(),),
+ background=self.background,
+ logfile=self._log_path), self._distfiles_digester_exit)
+ return
+
+ self._success()
+ self.returncode = os.EX_OK
+ self._async_wait()
+ return
+
+ self._start_fetch()
+
+ def _success(self):
+ if not self._previously_added:
+ size = self.digests["size"]
+ self.config.added_byte_count += size
+ self.config.added_file_count += 1
+ self.config.log_success("%s\t%s\tadded %i bytes" %
+ (self.cpv, self.distfile, size))
+
+ if self._log_path is not None:
+ if not self.config.options.dry_run:
+ try:
+ os.unlink(self._log_path)
+ except OSError:
+ pass
+
+ if self.config.options.recycle_dir is not None:
+
+ recycle_file = os.path.join(
+ self.config.options.recycle_dir, self.distfile)
+
+ if self.config.options.dry_run:
+ if os.path.exists(recycle_file):
+ logging.info("dry-run: delete '%s' from recycle" %
+ (self.distfile,))
+ else:
+ try:
+ os.unlink(recycle_file)
+ except OSError:
+ pass
+ else:
+ logging.debug("delete '%s' from recycle" %
+ (self.distfile,))
+
+ def _distfiles_digester_exit(self, digester):
+
+ self._assert_current(digester)
+ if self._was_cancelled():
+ self.wait()
+ return
+
+ if self._default_exit(digester) != os.EX_OK:
+ msg = "%s distfiles digester failed unexpectedly" % \
+ (self.distfile,)
+ self.scheduler.output(msg + '\n', background=True,
+ log_path=self._log_path)
+ logging.error(msg)
+ self.wait()
+ return
+
+ wrong_digest = self._find_bad_digest(digester.digests)
+ if wrong_digest is None:
+ self._success()
+ self.returncode = os.EX_OK
+ self.wait()
+ return
+
+ self._start_fetch()
+
+ _mirror_info = collections.namedtuple('_mirror_info',
+ 'name location')
+
+ def _start_fetch(self):
+
+ self._previously_added = False
+ self._fs_mirror_stack = []
+ if self.config.options.distfiles_local is not None:
+ self._fs_mirror_stack.append(self._mirror_info(
+ 'distfiles-local', self.config.options.distfiles_local))
+ if self.config.options.recycle_dir is not None:
+ self._fs_mirror_stack.append(self._mirror_info(
+ 'recycle', self.config.options.recycle_dir))
+
+ self._primaryuri_stack = []
+ self._mirror_stack = []
+ for uri in reversed(self.uri_tuple):
+ if uri.startswith('mirror://'):
+ self._mirror_stack.append(
+ self._mirror_iterator(uri, self.config.mirrors))
+ else:
+ self._primaryuri_stack.append(uri)
+
+ self._tried_uris = set()
+ self._try_next_mirror()
+
+ @staticmethod
+ def _mirror_iterator(uri, mirrors_dict):
+
+ slash_index = uri.find("/", 9)
+ if slash_index != -1:
+ mirror_name = uri[9:slash_index].strip("/")
+ for mirror in mirrors_dict.get(mirror_name, []):
+ yield mirror.rstrip("/") + "/" + uri[slash_index+1:]
+
+ def _try_next_mirror(self):
+ if self._fs_mirror_stack:
+ self._fetch_fs(self._fs_mirror_stack.pop())
+ return
+ else:
+ uri = self._next_uri()
+ if uri is not None:
+ self._tried_uris.add(uri)
+ self._fetch_uri(uri)
+ return
+
+ if self._tried_uris:
+ msg = "all uris failed"
+ else:
+ msg = "no fetchable uris"
+
+ self.config.log_failure("%s\t%s\t%s" %
+ (self.cpv, self.distfile, msg))
+ self.config.file_failures[self.distfile] = self.cpv
+ self.returncode = os.EX_OK
+ self.wait()
+
+ def _next_uri(self):
+ remaining_tries = self.config.options.tries - len(self._tried_uris)
+ if remaining_tries > 0:
+
+ if remaining_tries <= self.config.options.tries / 2:
+ while self._primaryuri_stack:
+ uri = self._primaryuri_stack.pop()
+ if uri not in self._tried_uris:
+ return uri
+
+ while self._mirror_stack:
+ uri = next(self._mirror_stack[-1], None)
+ if uri is None:
+ self._mirror_stack.pop()
+ else:
+ if uri not in self._tried_uris:
+ return uri
+
+ if self._primaryuri_stack:
+ return self._primaryuri_stack.pop()
+
+ return None
+
+ def _fetch_fs(self, mirror_info):
+ file_path = os.path.join(mirror_info.location, self.distfile)
+
+ st = None
+ size_ok = False
+ try:
+ st = os.stat(file_path)
+ except OSError as e:
+ if e.errno not in (errno.ENOENT, errno.ESTALE):
+ msg = "%s stat failed in %s: %s" % \
+ (self.distfile, mirror_info.name, e)
+ self.scheduler.output(msg + '\n', background=True,
+ log_path=self._log_path)
+ logging.error(msg)
+ else:
+ size_ok = st.st_size == self.digests["size"]
+ self._current_stat = st
+
+ if size_ok:
+ self._current_mirror = mirror_info
+ self._start_task(
+ FileDigester(file_path=file_path,
+ hash_names=(self._select_hash(),),
+ background=self.background,
+ logfile=self._log_path),
+ self._fs_mirror_digester_exit)
+ else:
+ self._try_next_mirror()
+
+ def _fs_mirror_digester_exit(self, digester):
+
+ self._assert_current(digester)
+ if self._was_cancelled():
+ self.wait()
+ return
+
+ current_mirror = self._current_mirror
+ if digester.returncode != os.EX_OK:
+ msg = "%s %s digester failed unexpectedly" % \
+ (self.distfile, current_mirror.name)
+ self.scheduler.output(msg + '\n', background=True,
+ log_path=self._log_path)
+ logging.error(msg)
+ else:
+ bad_digest = self._find_bad_digest(digester.digests)
+ if bad_digest is not None:
+ msg = "%s %s has bad %s digest: expected %s, got %s" % \
+ (self.distfile, current_mirror.name, bad_digest,
+ self.digests[bad_digest], digester.digests[bad_digest])
+ self.scheduler.output(msg + '\n', background=True,
+ log_path=self._log_path)
+ logging.error(msg)
+ elif self.config.options.dry_run:
+ # Report success without actually touching any files
+ if self._same_device(current_mirror.location,
+ self.config.options.distfiles):
+ logging.info(("dry-run: hardlink '%s' from %s "
+ "to distfiles") % (self.distfile, current_mirror.name))
+ else:
+ logging.info("dry-run: copy '%s' from %s to distfiles" %
+ (self.distfile, current_mirror.name))
+ self._success()
+ self.returncode = os.EX_OK
+ self.wait()
+ return
+ else:
+ src = os.path.join(current_mirror.location, self.distfile)
+ dest = os.path.join(self.config.options.distfiles, self.distfile)
+ if self._hardlink_atomic(src, dest,
+ "%s to %s" % (current_mirror.name, "distfiles")):
+ logging.debug("hardlink '%s' from %s to distfiles" %
+ (self.distfile, current_mirror.name))
+ self._success()
+ self.returncode = os.EX_OK
+ self.wait()
+ return
+ else:
+ self._start_task(
+ FileCopier(src_path=src, dest_path=dest,
+ background=(self.background and
+ self._log_path is not None),
+ logfile=self._log_path),
+ self._fs_mirror_copier_exit)
+ return
+
+ self._try_next_mirror()
+
+ def _fs_mirror_copier_exit(self, copier):
+
+ self._assert_current(copier)
+ if self._was_cancelled():
+ self.wait()
+ return
+
+ current_mirror = self._current_mirror
+ if copier.returncode != os.EX_OK:
+ msg = "%s %s copy failed unexpectedly" % \
+ (self.distfile, current_mirror.name)
+ self.scheduler.output(msg + '\n', background=True,
+ log_path=self._log_path)
+ logging.error(msg)
+ else:
+
+ logging.debug("copy '%s' from %s to distfiles" %
+ (self.distfile, current_mirror.name))
+
+ try:
+ portage.util.apply_stat_permissions(
+ copier.dest_path, self._current_stat)
+ except (OSError, PortageException) as e:
+ msg = ("%s %s apply_stat_permissions "
+ "failed unexpectedly: %s") % \
+ (self.distfile, current_mirror.name, e)
+ self.scheduler.output(msg + '\n', background=True,
+ log_path=self._log_path)
+ logging.error(msg)
+
+ try:
+ if sys.hexversion >= 0x3030000:
+ os.utime(copier.dest_path,
+ ns=(self._current_stat.st_mtime_ns,
+ self._current_stat.st_mtime_ns))
+ else:
+ os.utime(copier.dest_path,
+ (self._current_stat[stat.ST_MTIME],
+ self._current_stat[stat.ST_MTIME]))
+ except OSError as e:
+ msg = "%s %s utime failed unexpectedly: %s" % \
+ (self.distfile, current_mirror.name, e)
+ self.scheduler.output(msg + '\n', background=True,
+ log_path=self._log_path)
+ logging.error(msg)
+
+ self._success()
+ self.returncode = os.EX_OK
+ self.wait()
+ return
+
+ self._try_next_mirror()
+
+ def _fetch_uri(self, uri):
+
+ if self.config.options.dry_run:
+ # Simply report success.
+ logging.info("dry-run: fetch '%s' from '%s'" %
+ (self.distfile, uri))
+ self._success()
+ self.returncode = os.EX_OK
+ self.wait()
+ return
+
+ if self.config.options.temp_dir:
+ self._fetch_tmp_dir_info = 'temp-dir'
+ distdir = self.config.options.temp_dir
+ else:
+ self._fetch_tmp_dir_info = 'distfiles'
+ distdir = self.config.options.distfiles
+
+ tmp_basename = self.distfile + '._emirrordist_fetch_.%s' % os.getpid()
+
+ variables = {
+ "DISTDIR": distdir,
+ "URI": uri,
+ "FILE": tmp_basename
+ }
+
+ self._fetch_tmp_file = os.path.join(distdir, tmp_basename)
+
+ try:
+ os.unlink(self._fetch_tmp_file)
+ except OSError:
+ pass
+
+ args = portage.util.shlex_split(default_fetchcommand)
+ args = [portage.util.varexpand(x, mydict=variables)
+ for x in args]
+
+ if sys.hexversion < 0x3000000 or sys.hexversion >= 0x3020000:
+ # Python 3.1 does not support bytes in Popen args.
+ args = [_unicode_encode(x,
+ encoding=_encodings['fs'], errors='strict') for x in args]
+
+ null_fd = os.open(os.devnull, os.O_RDONLY)
+ fetcher = PopenProcess(background=self.background,
+ proc=subprocess.Popen(args, stdin=null_fd,
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
+ scheduler=self.scheduler)
+ os.close(null_fd)
+
+ fetcher.pipe_reader = PipeLogger(background=self.background,
+ input_fd=fetcher.proc.stdout, log_file_path=self._log_path,
+ scheduler=self.scheduler)
+
+ self._start_task(fetcher, self._fetcher_exit)
+
+ def _fetcher_exit(self, fetcher):
+
+ self._assert_current(fetcher)
+ if self._was_cancelled():
+ self.wait()
+ return
+
+ if os.path.exists(self._fetch_tmp_file):
+ self._start_task(
+ FileDigester(file_path=self._fetch_tmp_file,
+ hash_names=(self._select_hash(),),
+ background=self.background,
+ logfile=self._log_path),
+ self._fetch_digester_exit)
+ else:
+ self._try_next_mirror()
+
+ def _fetch_digester_exit(self, digester):
+
+ self._assert_current(digester)
+ if self._was_cancelled():
+ self.wait()
+ return
+
+ if digester.returncode != os.EX_OK:
+ msg = "%s %s digester failed unexpectedly" % \
+ (self.distfile, self._fetch_tmp_dir_info)
+ self.scheduler.output(msg + '\n', background=True,
+ log_path=self._log_path)
+ logging.error(msg)
+ else:
+ bad_digest = self._find_bad_digest(digester.digests)
+ if bad_digest is not None:
+ msg = "%s %s has bad %s digest: expected %s, got %s" % \
+ (self.distfile, self._current_mirror.name, bad_digest,
+ self.digests[bad_digest], digester.digests[bad_digest])
+ self.scheduler.output(msg + '\n', background=True,
+ log_path=self._log_path)
+ try:
+ os.unlink(self._fetch_tmp_file)
+ except OSError:
+ pass
+ else:
+ dest = os.path.join(self.config.options.distfiles, self.distfile)
+ try:
+ os.rename(self._fetch_tmp_file, dest)
+ except OSError:
+ self._start_task(
+ FileCopier(src_path=self._fetch_tmp_file,
+ dest_path=dest,
+ background=(self.background and
+ self._log_path is not None),
+ logfile=self._log_path),
+ self._fetch_copier_exit)
+ return
+ else:
+ self._success()
+ self.returncode = os.EX_OK
+ self.wait()
+ return
+
+ self._try_next_mirror()
+
+ def _fetch_copier_exit(self, copier):
+
+ self._assert_current(copier)
+
+ try:
+ os.unlink(self._fetch_tmp_file)
+ except OSError:
+ pass
+
+ if self._was_cancelled():
+ self.wait()
+ return
+
+ if copier.returncode == os.EX_OK:
+ self._success()
+ self.returncode = os.EX_OK
+ self.wait()
+ else:
+ # out of space?
+ msg = "%s %s copy failed unexpectedly" % \
+ (self.distfile, self._fetch_tmp_dir_info)
+ self.scheduler.output(msg + '\n', background=True,
+ log_path=self._log_path)
+ logging.error(msg)
+ self.config.log_failure("%s\t%s\t%s" %
+ (self.cpv, self.distfile, msg))
+ self.config.file_failures[self.distfile] = self.cpv
+ self.returncode = 1
+ self.wait()
+
+ def _unlink_file(self, file_path, dir_info):
+ try:
+ os.unlink(file_path)
+ except OSError as e:
+ if e.errno not in (errno.ENOENT, errno.ESTALE):
+ msg = "unlink '%s' failed in %s: %s" % \
+ (self.distfile, dir_info, e)
+ self.scheduler.output(msg + '\n', background=True,
+ log_path=self._log_path)
+ logging.error(msg)
+ return False
+ return True
+
+ def _have_needed_digests(self):
+ return "size" in self.digests and \
+ self._select_hash() is not None
+
+ def _select_hash(self):
+ if default_hash_name in self.digests:
+ return default_hash_name
+ else:
+ for hash_name in self.digests:
+ if hash_name != "size" and \
+ hash_name in portage.checksum.hashfunc_map:
+ return hash_name
+
+ return None
+
+ def _find_bad_digest(self, digests):
+ for hash_name, hash_value in digests.items():
+ if self.digests[hash_name] != hash_value:
+ return hash_name
+ return None
+
+ @staticmethod
+ def _same_device(path1, path2):
+ try:
+ st1 = os.stat(path1)
+ st2 = os.stat(path2)
+ except OSError:
+ return False
+ else:
+ return st1.st_dev == st2.st_dev
+
+ def _hardlink_atomic(self, src, dest, dir_info):
+
+ head, tail = os.path.split(dest)
+ hardlink_tmp = os.path.join(head, ".%s._mirrordist_hardlink_.%s" % \
+ (tail, os.getpid()))
+
+ try:
+ try:
+ os.link(src, hardlink_tmp)
+ except OSError as e:
+ if e.errno != errno.EXDEV:
+ msg = "hardlink %s from %s failed: %s" % \
+ (self.distfile, dir_info, e)
+ self.scheduler.output(msg + '\n', background=True,
+ log_path=self._log_path)
+ logging.error(msg)
+ return False
+
+ try:
+ os.rename(hardlink_tmp, dest)
+ except OSError as e:
+ msg = "hardlink rename '%s' from %s failed: %s" % \
+ (self.distfile, dir_info, e)
+ self.scheduler.output(msg + '\n', background=True,
+ log_path=self._log_path)
+ logging.error(msg)
+ return False
+ finally:
+ try:
+ os.unlink(hardlink_tmp)
+ except OSError:
+ pass
+
+ return True
diff --git a/pym/portage/_emirrordist/MirrorDistTask.py b/pym/portage/_emirrordist/MirrorDistTask.py
new file mode 100644
index 000000000..b6f875d4b
--- /dev/null
+++ b/pym/portage/_emirrordist/MirrorDistTask.py
@@ -0,0 +1,218 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import errno
+import logging
+import sys
+import time
+
+try:
+ import threading
+except ImportError:
+ import dummy_threading as threading
+
+import portage
+from portage import os
+from portage.util._async.TaskScheduler import TaskScheduler
+from _emerge.CompositeTask import CompositeTask
+from .FetchIterator import FetchIterator
+from .DeletionIterator import DeletionIterator
+
+if sys.hexversion >= 0x3000000:
+ long = int
+
+class MirrorDistTask(CompositeTask):
+
+ __slots__ = ('_config', '_terminated', '_term_check_id')
+
+ def __init__(self, config):
+ CompositeTask.__init__(self, scheduler=config.event_loop)
+ self._config = config
+ self._terminated = threading.Event()
+
+ def _start(self):
+ self._term_check_id = self.scheduler.idle_add(self._termination_check)
+ fetch = TaskScheduler(iter(FetchIterator(self._config)),
+ max_jobs=self._config.options.jobs,
+ max_load=self._config.options.load_average,
+ event_loop=self._config.event_loop)
+ self._start_task(fetch, self._fetch_exit)
+
+ def _fetch_exit(self, fetch):
+
+ self._assert_current(fetch)
+ if self._was_cancelled():
+ self.wait()
+ return
+
+ if self._config.options.delete:
+ deletion = TaskScheduler(iter(DeletionIterator(self._config)),
+ max_jobs=self._config.options.jobs,
+ max_load=self._config.options.load_average,
+ event_loop=self._config.event_loop)
+ self._start_task(deletion, self._deletion_exit)
+ return
+
+ self._post_deletion()
+
+ def _deletion_exit(self, deletion):
+
+ self._assert_current(deletion)
+ if self._was_cancelled():
+ self.wait()
+ return
+
+ self._post_deletion()
+
+ def _post_deletion(self):
+
+ if self._config.options.recycle_db is not None:
+ self._update_recycle_db()
+
+ if self._config.options.scheduled_deletion_log is not None:
+ self._scheduled_deletion_log()
+
+ self._summary()
+
+ self.returncode = os.EX_OK
+ self._current_task = None
+ self.wait()
+
+ def _update_recycle_db(self):
+
+ start_time = self._config.start_time
+ recycle_dir = self._config.options.recycle_dir
+ recycle_db = self._config.recycle_db
+ r_deletion_delay = self._config.options.recycle_deletion_delay
+
+ # Use a dict optimize access.
+ recycle_db_cache = dict(recycle_db.items())
+
+ for filename in os.listdir(recycle_dir):
+
+ recycle_file = os.path.join(recycle_dir, filename)
+
+ try:
+ st = os.stat(recycle_file)
+ except OSError as e:
+ if e.errno not in (errno.ENOENT, errno.ESTALE):
+ logging.error(("stat failed for '%s' in "
+ "recycle: %s") % (filename, e))
+ continue
+
+ value = recycle_db_cache.pop(filename, None)
+ if value is None:
+ logging.debug(("add '%s' to "
+ "recycle db") % filename)
+ recycle_db[filename] = (st.st_size, start_time)
+ else:
+ r_size, r_time = value
+ if long(r_size) != st.st_size:
+ recycle_db[filename] = (st.st_size, start_time)
+ elif r_time + r_deletion_delay < start_time:
+ if self._config.options.dry_run:
+ logging.info(("dry-run: delete '%s' from "
+ "recycle") % filename)
+ logging.info(("drop '%s' from "
+ "recycle db") % filename)
+ else:
+ try:
+ os.unlink(recycle_file)
+ except OSError as e:
+ if e.errno not in (errno.ENOENT, errno.ESTALE):
+ logging.error(("delete '%s' from "
+ "recycle failed: %s") % (filename, e))
+ else:
+ logging.debug(("delete '%s' from "
+ "recycle") % filename)
+ try:
+ del recycle_db[filename]
+ except KeyError:
+ pass
+ else:
+ logging.debug(("drop '%s' from "
+ "recycle db") % filename)
+
+ # Existing files were popped from recycle_db_cache,
+ # so any remaining entries are for files that no
+ # longer exist.
+ for filename in recycle_db_cache:
+ try:
+ del recycle_db[filename]
+ except KeyError:
+ pass
+ else:
+ logging.debug(("drop non-existent '%s' from "
+ "recycle db") % filename)
+
+ def _scheduled_deletion_log(self):
+
+ start_time = self._config.start_time
+ dry_run = self._config.options.dry_run
+ deletion_delay = self._config.options.deletion_delay
+ distfiles_db = self._config.distfiles_db
+
+ date_map = {}
+ for filename, timestamp in self._config.deletion_db.items():
+ date = timestamp + deletion_delay
+ if date < start_time:
+ date = start_time
+ date = time.strftime("%Y-%m-%d", time.gmtime(date))
+ date_files = date_map.get(date)
+ if date_files is None:
+ date_files = []
+ date_map[date] = date_files
+ date_files.append(filename)
+
+ if dry_run:
+ logging.warn(("dry-run: scheduled-deletions log "
+ "will be summarized via logging.info"))
+
+ lines = []
+ for date in sorted(date_map):
+ date_files = date_map[date]
+ if dry_run:
+ logging.info(("dry-run: scheduled deletions for %s: %s files") %
+ (date, len(date_files)))
+ lines.append("%s\n" % date)
+ for filename in date_files:
+ cpv = "unknown"
+ if distfiles_db is not None:
+ cpv = distfiles_db.get(filename, cpv)
+ lines.append("\t%s\t%s\n" % (filename, cpv))
+
+ if not dry_run:
+ portage.util.write_atomic(
+ self._config.options.scheduled_deletion_log,
+ "".join(lines))
+
+ def _summary(self):
+ elapsed_time = time.time() - self._config.start_time
+ fail_count = len(self._config.file_failures)
+ delete_count = self._config.delete_count
+ scheduled_deletion_count = self._config.scheduled_deletion_count - delete_count
+ added_file_count = self._config.added_file_count
+ added_byte_count = self._config.added_byte_count
+
+ logging.info("finished in %i seconds" % elapsed_time)
+ logging.info("failed to fetch %i files" % fail_count)
+ logging.info("deleted %i files" % delete_count)
+ logging.info("deletion of %i files scheduled" %
+ scheduled_deletion_count)
+ logging.info("added %i files" % added_file_count)
+ logging.info("added %i bytes total" % added_byte_count)
+
+ def terminate(self):
+ self._terminated.set()
+
+ def _termination_check(self):
+ if self._terminated.is_set():
+ self.cancel()
+ self.wait()
+ return True
+
+ def _wait(self):
+ CompositeTask._wait(self)
+ if self._term_check_id is not None:
+ self.scheduler.source_remove(self._term_check_id)
+ self._term_check_id = None
diff --git a/pym/portage/_emirrordist/__init__.py b/pym/portage/_emirrordist/__init__.py
new file mode 100644
index 000000000..6cde9320b
--- /dev/null
+++ b/pym/portage/_emirrordist/__init__.py
@@ -0,0 +1,2 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
diff --git a/pym/portage/_emirrordist/main.py b/pym/portage/_emirrordist/main.py
new file mode 100644
index 000000000..6b7c22ff1
--- /dev/null
+++ b/pym/portage/_emirrordist/main.py
@@ -0,0 +1,438 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import logging
+import optparse
+import sys
+
+import portage
+from portage import os
+from portage.util import normalize_path
+from portage.util._async.run_main_scheduler import run_main_scheduler
+from portage.util._async.SchedulerInterface import SchedulerInterface
+from portage.util._eventloop.global_event_loop import global_event_loop
+from portage._emirrordist.Config import Config
+from .Config import Config
+from .MirrorDistTask import MirrorDistTask
+
+if sys.hexversion >= 0x3000000:
+ long = int
+
+seconds_per_day = 24 * 60 * 60
+
+common_options = (
+ {
+ "longopt" : "--dry-run",
+ "help" : "perform a trial run with no changes made (usually combined "
+ "with --verbose)",
+ "action" : "store_true"
+ },
+ {
+ "longopt" : "--verbose",
+ "shortopt" : "-v",
+ "help" : "display extra information on stderr "
+ "(multiple occurences increase verbosity)",
+ "action" : "count",
+ "default" : 0,
+ },
+ {
+ "longopt" : "--ignore-default-opts",
+ "help" : "do not use the EMIRRORDIST_DEFAULT_OPTS environment variable",
+ "action" : "store_true"
+ },
+ {
+ "longopt" : "--distfiles",
+ "help" : "distfiles directory to use (required)",
+ "metavar" : "DIR"
+ },
+ {
+ "longopt" : "--jobs",
+ "shortopt" : "-j",
+ "help" : "number of concurrent jobs to run"
+ },
+ {
+ "longopt" : "--load-average",
+ "shortopt" : "-l",
+ "help" : "load average limit for spawning of new concurrent jobs",
+ "metavar" : "LOAD"
+ },
+ {
+ "longopt" : "--tries",
+ "help" : "maximum number of tries per file, 0 means unlimited (default is 10)",
+ "default" : 10
+ },
+ {
+ "longopt" : "--repo",
+ "help" : "name of repo to operate on (default repo is located at $PORTDIR)"
+ },
+ {
+ "longopt" : "--config-root",
+ "help" : "location of portage config files",
+ "metavar" : "DIR"
+ },
+ {
+ "longopt" : "--portdir",
+ "help" : "override the portage tree location",
+ "metavar" : "DIR"
+ },
+ {
+ "longopt" : "--portdir-overlay",
+ "help" : "override the PORTDIR_OVERLAY variable (requires "
+ "that --repo is also specified)"
+ },
+ {
+ "longopt" : "--strict-manifests",
+ "help" : "manually override \"strict\" FEATURES setting",
+ "choices" : ("y", "n"),
+ "metavar" : "<y|n>",
+ "type" : "choice"
+ },
+ {
+ "longopt" : "--failure-log",
+ "help" : "log file for fetch failures, with tab-delimited "
+ "output, for reporting purposes",
+ "metavar" : "FILE"
+ },
+ {
+ "longopt" : "--success-log",
+ "help" : "log file for fetch successes, with tab-delimited "
+ "output, for reporting purposes",
+ "metavar" : "FILE"
+ },
+ {
+ "longopt" : "--scheduled-deletion-log",
+ "help" : "log file for scheduled deletions, with tab-delimited "
+ "output, for reporting purposes",
+ "metavar" : "FILE"
+ },
+ {
+ "longopt" : "--delete",
+ "help" : "enable deletion of unused distfiles",
+ "action" : "store_true"
+ },
+ {
+ "longopt" : "--deletion-db",
+ "help" : "database file used to track lifetime of files "
+ "scheduled for delayed deletion",
+ "metavar" : "FILE"
+ },
+ {
+ "longopt" : "--deletion-delay",
+ "help" : "delay time for deletion, measured in seconds",
+ "metavar" : "SECONDS"
+ },
+ {
+ "longopt" : "--temp-dir",
+ "help" : "temporary directory for downloads",
+ "metavar" : "DIR"
+ },
+ {
+ "longopt" : "--mirror-overrides",
+ "help" : "file holding a list of mirror overrides",
+ "metavar" : "FILE"
+ },
+ {
+ "longopt" : "--mirror-skip",
+ "help" : "comma delimited list of mirror targets to skip "
+ "when fetching"
+ },
+ {
+ "longopt" : "--restrict-mirror-exemptions",
+ "help" : "comma delimited list of mirror targets for which to "
+ "ignore RESTRICT=\"mirror\""
+ },
+ {
+ "longopt" : "--verify-existing-digest",
+ "help" : "use digest as a verification of whether existing "
+ "distfiles are valid",
+ "action" : "store_true"
+ },
+ {
+ "longopt" : "--distfiles-local",
+ "help" : "distfiles-local directory to use",
+ "metavar" : "DIR"
+ },
+ {
+ "longopt" : "--distfiles-db",
+ "help" : "database file used to track which ebuilds a "
+ "distfile belongs to",
+ "metavar" : "FILE"
+ },
+ {
+ "longopt" : "--recycle-dir",
+ "help" : "directory for extended retention of files that "
+ "are removed from distdir with the --delete option",
+ "metavar" : "DIR"
+ },
+ {
+ "longopt" : "--recycle-db",
+ "help" : "database file used to track lifetime of files "
+ "in recycle dir",
+ "metavar" : "FILE"
+ },
+ {
+ "longopt" : "--recycle-deletion-delay",
+ "help" : "delay time for deletion of unused files from "
+ "recycle dir, measured in seconds (defaults to "
+ "the equivalent of 60 days)",
+ "default" : 60 * seconds_per_day,
+ "metavar" : "SECONDS"
+ },
+ {
+ "longopt" : "--fetch-log-dir",
+ "help" : "directory for individual fetch logs",
+ "metavar" : "DIR"
+ },
+ {
+ "longopt" : "--whitelist-from",
+ "help" : "specifies a file containing a list of files to "
+ "whitelist, one per line, # prefixed lines ignored",
+ "action" : "append",
+ "metavar" : "FILE"
+ },
+)
+
+def parse_args(args):
+ description = "emirrordist - a fetch tool for mirroring " \
+ "of package distfiles"
+ usage = "emirrordist [options] <action>"
+ parser = optparse.OptionParser(description=description, usage=usage)
+
+ actions = optparse.OptionGroup(parser, 'Actions')
+ actions.add_option("--version",
+ action="store_true",
+ help="display portage version and exit")
+ actions.add_option("--mirror",
+ action="store_true",
+ help="mirror distfiles for the selected repository")
+ parser.add_option_group(actions)
+
+ common = optparse.OptionGroup(parser, 'Common options')
+ for opt_info in common_options:
+ opt_pargs = [opt_info["longopt"]]
+ if opt_info.get("shortopt"):
+ opt_pargs.append(opt_info["shortopt"])
+ opt_kwargs = {"help" : opt_info["help"]}
+ for k in ("action", "choices", "default", "metavar", "type"):
+ if k in opt_info:
+ opt_kwargs[k] = opt_info[k]
+ common.add_option(*opt_pargs, **opt_kwargs)
+ parser.add_option_group(common)
+
+ options, args = parser.parse_args(args)
+
+ return (parser, options, args)
+
+def emirrordist_main(args):
+
+ parser, options, args = parse_args(args)
+
+ if options.version:
+ sys.stdout.write("Portage %s\n" % portage.VERSION)
+ return os.EX_OK
+
+ config_root = options.config_root
+
+ # The calling environment is ignored, so the program is
+ # completely controlled by commandline arguments.
+ env = {}
+
+ if options.repo is None:
+ env['PORTDIR_OVERLAY'] = ''
+ elif options.portdir_overlay:
+ env['PORTDIR_OVERLAY'] = options.portdir_overlay
+
+ if options.portdir is not None:
+ env['PORTDIR'] = options.portdir
+
+ settings = portage.config(config_root=config_root,
+ local_config=False, env=env)
+
+ default_opts = None
+ if not options.ignore_default_opts:
+ default_opts = settings.get('EMIRRORDIST_DEFAULT_OPTS', '').split()
+
+ if default_opts:
+ parser, options, args = parse_args(default_opts + args)
+
+ settings = portage.config(config_root=config_root,
+ local_config=False, env=env)
+
+ repo_path = None
+ if options.repo is not None:
+ repo_path = settings.repositories.treemap.get(options.repo)
+ if repo_path is None:
+ parser.error("Unable to locate repository named '%s'" % \
+ (options.repo,))
+ else:
+ repo_path = settings.repositories.mainRepoLocation()
+ if not repo_path:
+ parser.error("PORTDIR is undefined")
+
+ if options.jobs is not None:
+ options.jobs = int(options.jobs)
+
+ if options.load_average is not None:
+ options.load_average = float(options.load_average)
+
+ if options.failure_log is not None:
+ options.failure_log = normalize_path(
+ os.path.abspath(options.failure_log))
+
+ parent_dir = os.path.dirname(options.failure_log)
+ if not (os.path.isdir(parent_dir) and
+ os.access(parent_dir, os.W_OK|os.X_OK)):
+ parser.error(("--failure-log '%s' parent is not a "
+ "writable directory") % options.failure_log)
+
+ if options.success_log is not None:
+ options.success_log = normalize_path(
+ os.path.abspath(options.success_log))
+
+ parent_dir = os.path.dirname(options.success_log)
+ if not (os.path.isdir(parent_dir) and
+ os.access(parent_dir, os.W_OK|os.X_OK)):
+ parser.error(("--success-log '%s' parent is not a "
+ "writable directory") % options.success_log)
+
+ if options.scheduled_deletion_log is not None:
+ options.scheduled_deletion_log = normalize_path(
+ os.path.abspath(options.scheduled_deletion_log))
+
+ parent_dir = os.path.dirname(options.scheduled_deletion_log)
+ if not (os.path.isdir(parent_dir) and
+ os.access(parent_dir, os.W_OK|os.X_OK)):
+ parser.error(("--scheduled-deletion-log '%s' parent is not a "
+ "writable directory") % options.scheduled_deletion_log)
+
+ if options.deletion_db is None:
+ parser.error("--scheduled-deletion-log requires --deletion-db")
+
+ if options.deletion_delay is not None:
+ options.deletion_delay = long(options.deletion_delay)
+ if options.deletion_db is None:
+ parser.error("--deletion-delay requires --deletion-db")
+
+ if options.deletion_db is not None:
+ if options.deletion_delay is None:
+ parser.error("--deletion-db requires --deletion-delay")
+ options.deletion_db = normalize_path(
+ os.path.abspath(options.deletion_db))
+
+ if options.temp_dir is not None:
+ options.temp_dir = normalize_path(
+ os.path.abspath(options.temp_dir))
+
+ if not (os.path.isdir(options.temp_dir) and
+ os.access(options.temp_dir, os.W_OK|os.X_OK)):
+ parser.error(("--temp-dir '%s' is not a "
+ "writable directory") % options.temp_dir)
+
+ if options.distfiles is not None:
+ options.distfiles = normalize_path(
+ os.path.abspath(options.distfiles))
+
+ if not (os.path.isdir(options.distfiles) and
+ os.access(options.distfiles, os.W_OK|os.X_OK)):
+ parser.error(("--distfiles '%s' is not a "
+ "writable directory") % options.distfiles)
+ else:
+ parser.error("missing required --distfiles parameter")
+
+ if options.mirror_overrides is not None:
+ options.mirror_overrides = normalize_path(
+ os.path.abspath(options.mirror_overrides))
+
+ if not (os.access(options.mirror_overrides, os.R_OK) and
+ os.path.isfile(options.mirror_overrides)):
+ parser.error(
+ "--mirror-overrides-file '%s' is not a readable file" %
+ options.mirror_overrides)
+
+ if options.distfiles_local is not None:
+ options.distfiles_local = normalize_path(
+ os.path.abspath(options.distfiles_local))
+
+ if not (os.path.isdir(options.distfiles_local) and
+ os.access(options.distfiles_local, os.W_OK|os.X_OK)):
+ parser.error(("--distfiles-local '%s' is not a "
+ "writable directory") % options.distfiles_local)
+
+ if options.distfiles_db is not None:
+ options.distfiles_db = normalize_path(
+ os.path.abspath(options.distfiles_db))
+
+ if options.tries is not None:
+ options.tries = int(options.tries)
+
+ if options.recycle_dir is not None:
+ options.recycle_dir = normalize_path(
+ os.path.abspath(options.recycle_dir))
+ if not (os.path.isdir(options.recycle_dir) and
+ os.access(options.recycle_dir, os.W_OK|os.X_OK)):
+ parser.error(("--recycle-dir '%s' is not a "
+ "writable directory") % options.recycle_dir)
+
+ if options.recycle_db is not None:
+ if options.recycle_dir is None:
+ parser.error("--recycle-db requires "
+ "--recycle-dir to be specified")
+ options.recycle_db = normalize_path(
+ os.path.abspath(options.recycle_db))
+
+ if options.recycle_deletion_delay is not None:
+ options.recycle_deletion_delay = \
+ long(options.recycle_deletion_delay)
+
+ if options.fetch_log_dir is not None:
+ options.fetch_log_dir = normalize_path(
+ os.path.abspath(options.fetch_log_dir))
+
+ if not (os.path.isdir(options.fetch_log_dir) and
+ os.access(options.fetch_log_dir, os.W_OK|os.X_OK)):
+ parser.error(("--fetch-log-dir '%s' is not a "
+ "writable directory") % options.fetch_log_dir)
+
+ if options.whitelist_from:
+ normalized_paths = []
+ for x in options.whitelist_from:
+ path = normalize_path(os.path.abspath(x))
+ normalized_paths.append(path)
+ if not (os.access(path, os.R_OK) and os.path.isfile(path)):
+ parser.error(
+ "--whitelist-from '%s' is not a readable file" % x)
+ options.whitelist_from = normalized_paths
+
+ if options.strict_manifests is not None:
+ if options.strict_manifests == "y":
+ settings.features.add("strict")
+ else:
+ settings.features.discard("strict")
+
+ settings.lock()
+
+ portdb = portage.portdbapi(mysettings=settings)
+
+ # Limit ebuilds to the specified repo.
+ portdb.porttrees = [repo_path]
+
+ portage.util.initialize_logger()
+
+ if options.verbose > 0:
+ l = logging.getLogger()
+ l.setLevel(l.getEffectiveLevel() - 10 * options.verbose)
+
+ with Config(options, portdb,
+ SchedulerInterface(global_event_loop())) as config:
+
+ if not options.mirror:
+ parser.error('No action specified')
+
+ returncode = os.EX_OK
+
+ if options.mirror:
+ signum = run_main_scheduler(MirrorDistTask(config))
+ if signum is not None:
+ sys.exit(128 + signum)
+
+ return returncode
diff --git a/pym/portage/util/_ShelveUnicodeWrapper.py b/pym/portage/util/_ShelveUnicodeWrapper.py
new file mode 100644
index 000000000..adbd5199f
--- /dev/null
+++ b/pym/portage/util/_ShelveUnicodeWrapper.py
@@ -0,0 +1,45 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+class ShelveUnicodeWrapper(object):
+ """
+ Convert unicode to str and back again, since python-2.x shelve
+ module doesn't support unicode.
+ """
+ def __init__(self, shelve_instance):
+ self._shelve = shelve_instance
+
+ def _encode(self, s):
+ if isinstance(s, unicode):
+ s = s.encode('utf_8')
+ return s
+
+ def __len__(self):
+ return len(self._shelve)
+
+ def __contains__(self, k):
+ return self._encode(k) in self._shelve
+
+ def __iter__(self):
+ return self._shelve.__iter__()
+
+ def items(self):
+ return self._shelve.iteritems()
+
+ def __setitem__(self, k, v):
+ self._shelve[self._encode(k)] = self._encode(v)
+
+ def __getitem__(self, k):
+ return self._shelve[self._encode(k)]
+
+ def __delitem__(self, k):
+ del self._shelve[self._encode(k)]
+
+ def get(self, k, *args):
+ return self._shelve.get(self._encode(k), *args)
+
+ def close(self):
+ self._shelve.close()
+
+ def clear(self):
+ self._shelve.clear()
diff --git a/pym/portage/util/_async/FileCopier.py b/pym/portage/util/_async/FileCopier.py
new file mode 100644
index 000000000..27e5ab4c0
--- /dev/null
+++ b/pym/portage/util/_async/FileCopier.py
@@ -0,0 +1,17 @@
+# Copyright 2013 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+from portage import os
+from portage import shutil
+from portage.util._async.ForkProcess import ForkProcess
+
+class FileCopier(ForkProcess):
+ """
+ Asynchronously copy a file.
+ """
+
+ __slots__ = ('src_path', 'dest_path')
+
+ def _run(self):
+ shutil.copy(self.src_path, self.dest_path)
+ return os.EX_OK