diff options
Diffstat (limited to 'pym')
-rw-r--r-- | pym/_emerge/PollScheduler.py | 6 | ||||
-rw-r--r-- | pym/portage/manifest.py | 10 | ||||
-rw-r--r-- | pym/portage/package/ebuild/_parallel_manifest/ManifestProcess.py | 43 | ||||
-rw-r--r-- | pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py | 79 | ||||
-rw-r--r-- | pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py | 75 | ||||
-rw-r--r-- | pym/portage/package/ebuild/_parallel_manifest/__init__.py | 2 | ||||
-rw-r--r-- | pym/portage/util/_async/AsyncScheduler.py | 88 | ||||
-rw-r--r-- | pym/portage/util/_async/ForkProcess.py | 48 | ||||
-rw-r--r-- | pym/portage/util/_async/__init__.py | 2 |
9 files changed, 349 insertions, 4 deletions
diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py index 808fa6e1f..bcf80ab5a 100644 --- a/pym/_emerge/PollScheduler.py +++ b/pym/_emerge/PollScheduler.py @@ -30,7 +30,7 @@ class PollScheduler(object): "output", "register", "run", "source_remove", "timeout_add", "unregister") - def __init__(self, main=False): + def __init__(self, main=False, event_loop=None): """ @param main: If True then use global_event_loop(), otherwise use a local EventLoop instance (default is False, for safe use in @@ -44,7 +44,9 @@ class PollScheduler(object): self._jobs = 0 self._scheduling = False self._background = False - if main: + if event_loop is not None: + self._event_loop = event_loop + elif main: self._event_loop = global_event_loop() else: self._event_loop = EventLoop(main=False) diff --git a/pym/portage/manifest.py b/pym/portage/manifest.py index b81b580d5..9a85c8f6d 100644 --- a/pym/portage/manifest.py +++ b/pym/portage/manifest.py @@ -266,9 +266,12 @@ class Manifest(object): (MANIFEST2_REQUIRED_HASH, t, f)) def write(self, sign=False, force=False): - """ Write Manifest instance to disk, optionally signing it """ + """ Write Manifest instance to disk, optionally signing it. Returns + True if the Manifest is actually written, and False if the write + is skipped due to existing Manifest being identical.""" + rval = False if not self.allow_create: - return + return rval self.checkIntegrity() try: myentries = list(self._createManifestEntries()) @@ -301,6 +304,7 @@ class Manifest(object): # non-empty for all currently known use cases. write_atomic(self.getFullname(), "".join("%s\n" % _unicode(myentry) for myentry in myentries)) + rval = True else: # With thin manifest, there's no need to have # a Manifest file if there are no DIST entries. @@ -309,6 +313,7 @@ class Manifest(object): except OSError as e: if e.errno != errno.ENOENT: raise + rval = True if sign: self.sign() @@ -316,6 +321,7 @@ class Manifest(object): if e.errno == errno.EACCES: raise PermissionDenied(str(e)) raise + return rval def sign(self): """ Sign the Manifest """ diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestProcess.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestProcess.py new file mode 100644 index 000000000..44e257664 --- /dev/null +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestProcess.py @@ -0,0 +1,43 @@ +# Copyright 2012 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import portage +from portage import os +from portage.exception import (FileNotFound, + PermissionDenied, PortagePackageException) +from portage.localization import _ +from portage.util._async.ForkProcess import ForkProcess + +class ManifestProcess(ForkProcess): + + __slots__ = ("cp", "distdir", "fetchlist_dict", "repo_config") + + MODIFIED = 16 + + def _run(self): + mf = self.repo_config.load_manifest( + os.path.join(self.repo_config.location, self.cp), + self.distdir, fetchlist_dict=self.fetchlist_dict) + + try: + mf.create(assumeDistHashesAlways=True) + except FileNotFound as e: + portage.writemsg(_("!!! File %s doesn't exist, can't update " + "Manifest\n") % e, noiselevel=-1) + return 1 + + except PortagePackageException as e: + portage.writemsg(("!!! %s\n") % (e,), noiselevel=-1) + return 1 + + try: + modified = mf.write(sign=False) + except PermissionDenied as e: + portage.writemsg("!!! %s: %s\n" % (_("Permission Denied"), e,), + noiselevel=-1) + return 1 + else: + if modified: + return self.MODIFIED + else: + return os.EX_OK diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py new file mode 100644 index 000000000..b480e7738 --- /dev/null +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestScheduler.py @@ -0,0 +1,79 @@ +# Copyright 2012 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import portage +from portage import os +from portage.dep import _repo_separator +from portage.localization import _ +from portage.util._async.AsyncScheduler import AsyncScheduler +from .ManifestTask import ManifestTask + +class ManifestScheduler(AsyncScheduler): + + def __init__(self, portdb, cp_iter=None, + gpg_cmd=None, gpg_vars=None, **kwargs): + + AsyncScheduler.__init__(self, **kwargs) + + self._portdb = portdb + + if cp_iter is None: + cp_iter = self._iter_every_cp() + self._cp_iter = cp_iter + self._gpg_cmd = gpg_cmd + self._gpg_vars = gpg_vars + self._task_iter = self._iter_tasks() + + def _next_task(self): + return next(self._task_iter) + + def _iter_every_cp(self): + every_cp = self._portdb.cp_all() + every_cp.sort(reverse=True) + try: + while not self._terminated_tasks: + yield every_cp.pop() + except IndexError: + pass + + def _iter_tasks(self): + portdb = self._portdb + distdir = portdb.settings["DISTDIR"] + disabled_repos = set() + + for cp in self._cp_iter: + if self._terminated_tasks: + break + # We iterate over portdb.porttrees, since it's common to + # tweak this attribute in order to adjust repo selection. + for mytree in portdb.porttrees: + repo_config = portdb.repositories.get_repo_for_location(mytree) + if not repo_config.create_manifest: + if repo_config.name not in disabled_repos: + disabled_repos.add(repo_config.name) + portage.writemsg( + _(">>> Skipping creating Manifest for %s%s%s; " + "repository is configured to not use them\n") % + (cp, _repo_separator, repo_config.name), + noiselevel=-1) + continue + cpv_list = portdb.cp_list(cp, mytree=[repo_config.location]) + if not cpv_list: + continue + fetchlist_dict = {} + for cpv in cpv_list: + fetchlist_dict[cpv] = \ + list(portdb.getFetchMap(cpv, mytree=mytree)) + + yield ManifestTask(cp=cp, distdir=distdir, + fetchlist_dict=fetchlist_dict, repo_config=repo_config, + gpg_cmd=self._gpg_cmd, gpg_vars=self._gpg_vars) + + def _task_exit(self, task): + AsyncScheduler._task_exit(self, task) + if task.returncode != os.EX_OK: + if not self._terminated_tasks: + portage.writemsg( + "Error processing %s%s%s, continuing...\n" % + (task.cp, _repo_separator, task.repo_config.name), + noiselevel=-1) diff --git a/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py new file mode 100644 index 000000000..53b85b254 --- /dev/null +++ b/pym/portage/package/ebuild/_parallel_manifest/ManifestTask.py @@ -0,0 +1,75 @@ +# Copyright 2012 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +from portage import os +from portage.util import shlex_split, varexpand, writemsg +from _emerge.CompositeTask import CompositeTask +from _emerge.SpawnProcess import SpawnProcess +from .ManifestProcess import ManifestProcess + +class ManifestTask(CompositeTask): + + __slots__ = ("cp", "distdir", "fetchlist_dict", "gpg_cmd", + "gpg_vars", "repo_config", "_manifest_path") + + def _start(self): + self._manifest_path = os.path.join(self.repo_config.location, + self.cp, "Manifest") + manifest_proc = ManifestProcess(cp=self.cp, distdir=self.distdir, + fetchlist_dict=self.fetchlist_dict, repo_config=self.repo_config, + scheduler=self.scheduler) + self._start_task(manifest_proc, self._manifest_proc_exit) + + def _manifest_proc_exit(self, manifest_proc): + self._assert_current(manifest_proc) + if manifest_proc.returncode not in (os.EX_OK, manifest_proc.MODIFIED): + self.returncode = manifest_proc.returncode + self._current_task = None + self.wait() + return + + modified = manifest_proc.returncode == manifest_proc.MODIFIED + + if self.gpg_cmd is None or not modified or \ + not os.path.exists(self._manifest_path): + self.returncode = os.EX_OK + self._current_task = None + self.wait() + return + + self._start_gpg_proc() + + def _start_gpg_proc(self): + gpg_vars = self.gpg_vars + if gpg_vars is None: + gpg_vars = {} + else: + gpg_vars = gpg_vars.copy() + gpg_vars["FILE"] = self._manifest_path + gpg_cmd = varexpand(self.gpg_cmd, mydict=gpg_vars) + gpg_cmd = shlex_split(gpg_cmd) + gpg_proc = SpawnProcess( + args=gpg_cmd, env=os.environ, scheduler=self.scheduler) + self._start_task(gpg_proc, self._gpg_proc_exit) + + def _gpg_proc_exit(self, gpg_proc): + if self._default_exit(gpg_proc) != os.EX_OK: + self.wait() + return + + rename_args = (self._manifest_path + ".asc", self._manifest_path) + try: + os.rename(*rename_args) + except OSError as e: + writemsg("!!! rename('%s', '%s'): %s\n" % rename_args + (e,), + noiselevel=-1) + try: + os.unlink(self._manifest_path + ".asc") + except OSError: + pass + self.returncode = 1 + else: + self.returncode = os.EX_OK + + self._current_task = None + self.wait() diff --git a/pym/portage/package/ebuild/_parallel_manifest/__init__.py b/pym/portage/package/ebuild/_parallel_manifest/__init__.py new file mode 100644 index 000000000..418ad862b --- /dev/null +++ b/pym/portage/package/ebuild/_parallel_manifest/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2012 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 diff --git a/pym/portage/util/_async/AsyncScheduler.py b/pym/portage/util/_async/AsyncScheduler.py new file mode 100644 index 000000000..cae45fd90 --- /dev/null +++ b/pym/portage/util/_async/AsyncScheduler.py @@ -0,0 +1,88 @@ +# Copyright 2012 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +from portage import os +from _emerge.AsynchronousTask import AsynchronousTask +from _emerge.PollScheduler import PollScheduler + +class AsyncScheduler(AsynchronousTask, PollScheduler): + + __slots__ = ('_error_count', '_loadavg_check_id', + '_max_jobs', '_max_load', + '_remaining_tasks', '_running_tasks', '_term_check_id') + + def __init__(self, max_jobs=None, max_load=None, **kwargs): + AsynchronousTask.__init__(self) + PollScheduler.__init__(self, **kwargs) + + if max_jobs is None: + max_jobs = 1 + self._max_jobs = max_jobs + self._max_load = max_load + self._error_count = 0 + self._running_tasks = set() + self._remaining_tasks = True + self._term_check_id = None + self._loadavg_check_id = None + + def _cancel(self): + self._terminated.set() + self._terminate_tasks() + + def _terminate_tasks(self): + for task in list(self._running_tasks): + task.cancel() + + def _next_task(self): + raise NotImplementedError(self) + + def _keep_scheduling(self): + return self._remaining_tasks and not self._terminated_tasks + + def _running_job_count(self): + return len(self._running_tasks) + + def _schedule_tasks(self): + while self._keep_scheduling() and self._can_add_job(): + try: + task = self._next_task() + except StopIteration: + self._remaining_tasks = False + else: + self._running_tasks.add(task) + task.scheduler = self.sched_iface + task.addExitListener(self._task_exit) + task.start() + + def _task_exit(self, task): + self._running_tasks.discard(task) + if task.returncode != os.EX_OK: + self._error_count += 1 + self._schedule() + + def _start(self): + self._term_check_id = self.sched_iface.idle_add(self._termination_check) + if self._max_load is not None: + # We have to schedule periodically, in case the load + # average has changed since the last call. + self._loadavg_check_id = self.sched_iface.timeout_add( + self._loadavg_latency, self._schedule) + self._schedule() + + def _wait(self): + # Loop while there are jobs to be scheduled. + while self._keep_scheduling(): + self.sched_iface.iteration() + + # Clean shutdown of previously scheduled jobs. In the + # case of termination, this allows for basic cleanup + # such as flushing of buffered output to logs. + while self._is_work_scheduled(): + self.sched_iface.iteration() + + if self._error_count > 0: + self.returncode = 1 + else: + self.returncode = os.EX_OK + + return self.returncode diff --git a/pym/portage/util/_async/ForkProcess.py b/pym/portage/util/_async/ForkProcess.py new file mode 100644 index 000000000..607d0ff57 --- /dev/null +++ b/pym/portage/util/_async/ForkProcess.py @@ -0,0 +1,48 @@ +# Copyright 2012 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import signal +import traceback + +import portage +from portage import os +from _emerge.SpawnProcess import SpawnProcess + +class ForkProcess(SpawnProcess): + + def _spawn(self, args, fd_pipes=None, **kwargs): + """ + Fork a subprocess, apply local settings, and call fetch(). + """ + + pid = os.fork() + if pid != 0: + if not isinstance(pid, int): + raise AssertionError( + "fork returned non-integer: %s" % (repr(pid),)) + portage.process.spawned_pids.append(pid) + return [pid] + + portage.locks._close_fds() + # Disable close_fds since we don't exec (see _setup_pipes docstring). + portage.process._setup_pipes(fd_pipes, close_fds=False) + + # Use default signal handlers in order to avoid problems + # killing subprocesses as reported in bug #353239. + signal.signal(signal.SIGINT, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + + rval = 1 + try: + rval = self._run() + except SystemExit: + raise + except: + traceback.print_exc() + finally: + # Call os._exit() from finally block, in order to suppress any + # finally blocks from earlier in the call stack. See bug #345289. + os._exit(rval) + + def _run(self): + raise NotImplementedError(self) diff --git a/pym/portage/util/_async/__init__.py b/pym/portage/util/_async/__init__.py new file mode 100644 index 000000000..418ad862b --- /dev/null +++ b/pym/portage/util/_async/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2012 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 |