diff options
Diffstat (limited to 'pym/portage/_emirrordist/MirrorDistTask.py')
-rw-r--r-- | pym/portage/_emirrordist/MirrorDistTask.py | 218 |
1 files changed, 218 insertions, 0 deletions
diff --git a/pym/portage/_emirrordist/MirrorDistTask.py b/pym/portage/_emirrordist/MirrorDistTask.py new file mode 100644 index 000000000..b6f875d4b --- /dev/null +++ b/pym/portage/_emirrordist/MirrorDistTask.py @@ -0,0 +1,218 @@ +# Copyright 2013 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import errno +import logging +import sys +import time + +try: + import threading +except ImportError: + import dummy_threading as threading + +import portage +from portage import os +from portage.util._async.TaskScheduler import TaskScheduler +from _emerge.CompositeTask import CompositeTask +from .FetchIterator import FetchIterator +from .DeletionIterator import DeletionIterator + +if sys.hexversion >= 0x3000000: + long = int + +class MirrorDistTask(CompositeTask): + + __slots__ = ('_config', '_terminated', '_term_check_id') + + def __init__(self, config): + CompositeTask.__init__(self, scheduler=config.event_loop) + self._config = config + self._terminated = threading.Event() + + def _start(self): + self._term_check_id = self.scheduler.idle_add(self._termination_check) + fetch = TaskScheduler(iter(FetchIterator(self._config)), + max_jobs=self._config.options.jobs, + max_load=self._config.options.load_average, + event_loop=self._config.event_loop) + self._start_task(fetch, self._fetch_exit) + + def _fetch_exit(self, fetch): + + self._assert_current(fetch) + if self._was_cancelled(): + self.wait() + return + + if self._config.options.delete: + deletion = TaskScheduler(iter(DeletionIterator(self._config)), + max_jobs=self._config.options.jobs, + max_load=self._config.options.load_average, + event_loop=self._config.event_loop) + self._start_task(deletion, self._deletion_exit) + return + + self._post_deletion() + + def _deletion_exit(self, deletion): + + self._assert_current(deletion) + if self._was_cancelled(): + self.wait() + return + + self._post_deletion() + + def _post_deletion(self): + + if self._config.options.recycle_db is not None: + self._update_recycle_db() + + if self._config.options.scheduled_deletion_log is not None: + self._scheduled_deletion_log() + + self._summary() + + self.returncode = os.EX_OK + self._current_task = None + self.wait() + + def _update_recycle_db(self): + + start_time = self._config.start_time + recycle_dir = self._config.options.recycle_dir + recycle_db = self._config.recycle_db + r_deletion_delay = self._config.options.recycle_deletion_delay + + # Use a dict optimize access. + recycle_db_cache = dict(recycle_db.items()) + + for filename in os.listdir(recycle_dir): + + recycle_file = os.path.join(recycle_dir, filename) + + try: + st = os.stat(recycle_file) + except OSError as e: + if e.errno not in (errno.ENOENT, errno.ESTALE): + logging.error(("stat failed for '%s' in " + "recycle: %s") % (filename, e)) + continue + + value = recycle_db_cache.pop(filename, None) + if value is None: + logging.debug(("add '%s' to " + "recycle db") % filename) + recycle_db[filename] = (st.st_size, start_time) + else: + r_size, r_time = value + if long(r_size) != st.st_size: + recycle_db[filename] = (st.st_size, start_time) + elif r_time + r_deletion_delay < start_time: + if self._config.options.dry_run: + logging.info(("dry-run: delete '%s' from " + "recycle") % filename) + logging.info(("drop '%s' from " + "recycle db") % filename) + else: + try: + os.unlink(recycle_file) + except OSError as e: + if e.errno not in (errno.ENOENT, errno.ESTALE): + logging.error(("delete '%s' from " + "recycle failed: %s") % (filename, e)) + else: + logging.debug(("delete '%s' from " + "recycle") % filename) + try: + del recycle_db[filename] + except KeyError: + pass + else: + logging.debug(("drop '%s' from " + "recycle db") % filename) + + # Existing files were popped from recycle_db_cache, + # so any remaining entries are for files that no + # longer exist. + for filename in recycle_db_cache: + try: + del recycle_db[filename] + except KeyError: + pass + else: + logging.debug(("drop non-existent '%s' from " + "recycle db") % filename) + + def _scheduled_deletion_log(self): + + start_time = self._config.start_time + dry_run = self._config.options.dry_run + deletion_delay = self._config.options.deletion_delay + distfiles_db = self._config.distfiles_db + + date_map = {} + for filename, timestamp in self._config.deletion_db.items(): + date = timestamp + deletion_delay + if date < start_time: + date = start_time + date = time.strftime("%Y-%m-%d", time.gmtime(date)) + date_files = date_map.get(date) + if date_files is None: + date_files = [] + date_map[date] = date_files + date_files.append(filename) + + if dry_run: + logging.warn(("dry-run: scheduled-deletions log " + "will be summarized via logging.info")) + + lines = [] + for date in sorted(date_map): + date_files = date_map[date] + if dry_run: + logging.info(("dry-run: scheduled deletions for %s: %s files") % + (date, len(date_files))) + lines.append("%s\n" % date) + for filename in date_files: + cpv = "unknown" + if distfiles_db is not None: + cpv = distfiles_db.get(filename, cpv) + lines.append("\t%s\t%s\n" % (filename, cpv)) + + if not dry_run: + portage.util.write_atomic( + self._config.options.scheduled_deletion_log, + "".join(lines)) + + def _summary(self): + elapsed_time = time.time() - self._config.start_time + fail_count = len(self._config.file_failures) + delete_count = self._config.delete_count + scheduled_deletion_count = self._config.scheduled_deletion_count - delete_count + added_file_count = self._config.added_file_count + added_byte_count = self._config.added_byte_count + + logging.info("finished in %i seconds" % elapsed_time) + logging.info("failed to fetch %i files" % fail_count) + logging.info("deleted %i files" % delete_count) + logging.info("deletion of %i files scheduled" % + scheduled_deletion_count) + logging.info("added %i files" % added_file_count) + logging.info("added %i bytes total" % added_byte_count) + + def terminate(self): + self._terminated.set() + + def _termination_check(self): + if self._terminated.is_set(): + self.cancel() + self.wait() + return True + + def _wait(self): + CompositeTask._wait(self) + if self._term_check_id is not None: + self.scheduler.source_remove(self._term_check_id) + self._term_check_id = None |