From 43833b58736bef2565250f3965a22ab747ba6d0b Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Tue, 1 Jul 2008 09:47:32 +0000 Subject: Add a new BinpkgFetcherAsync class and use it to implement parellel-fetch for --getbinpkg. svn path=/main/trunk/; revision=10868 --- pym/_emerge/__init__.py | 255 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 240 insertions(+), 15 deletions(-) diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index acaf16287..9aea4415f 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -21,7 +21,11 @@ except KeyboardInterrupt: sys.exit(1) import array +import fcntl import select +import shlex +import urlparse +import weakref import gc import os, stat import platform @@ -1991,6 +1995,195 @@ class BinpkgFetcher(Task): rval = 1 return rval +class BinpkgFetcherAsync(SlotObject): + + __slots__ = ("cancelled", "log_file", "fd_pipes", "pkg", + "register", "unregister", + "locked", "files", "pid", "pkg_path", "returncode", "_lock_obj") + + _file_names = ("fetcher", "out") + _files_dict = slot_dict_class(_file_names) + _bufsize = 4096 + + def __init__(self, **kwargs): + SlotObject.__init__(self, **kwargs) + pkg = self.pkg + self.pkg_path = pkg.root_config.trees["bintree"].getname(pkg.cpv) + + def start(self): + + if self.cancelled: + self.pid = -1 + return + + fd_pipes = self.fd_pipes + if fd_pipes is None: + fd_pipes = { + 0 : sys.stdin.fileno(), + 1 : sys.stdout.fileno(), + 2 : sys.stderr.fileno(), + } + + log_file = self.log_file + self.files = self._files_dict() + files = self.files + + if log_file is not None: + files["out"] = open(log_file, "a") + portage.util.apply_secpass_permissions(log_file, + uid=portage.portage_uid, gid=portage.portage_gid, + mode=0660) + else: + # flush any pending output + for fd in fd_pipes.itervalues(): + if fd == sys.stdout.fileno(): + sys.stdout.flush() + if fd == sys.stderr.fileno(): + sys.stderr.flush() + + files["out"] = os.fdopen(os.dup(fd_pipes[1]), 'w') + + master_fd, slave_fd = os.pipe() + fcntl.fcntl(master_fd, fcntl.F_SETFL, + fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK) + + fd_pipes.setdefault(0, sys.stdin.fileno()) + fd_pipes_orig = fd_pipes.copy() + fd_pipes[0] = fd_pipes_orig[0] + fd_pipes[1] = slave_fd + fd_pipes[2] = slave_fd + + pkg = self.pkg + bintree = pkg.root_config.trees["bintree"] + settings = bintree.settings + use_locks = "distlocks" in settings.features + pkg_path = self.pkg_path + resume = os.path.exists(pkg_path) + + # urljoin doesn't work correctly with + # unrecognized protocols like sftp + if bintree._remote_has_index: + rel_uri = bintree._remotepkgs[pkg.cpv].get("PATH") + if not rel_uri: + rel_uri = pkg.cpv + ".tbz2" + uri = bintree._remote_base_uri.rstrip("/") + \ + "/" + rel_uri.lstrip("/") + else: + uri = settings["PORTAGE_BINHOST"].rstrip("/") + \ + "/" + pkg.pf + ".tbz2" + + protocol = urlparse.urlparse(uri)[0] + fcmd_prefix = "FETCHCOMMAND" + if resume: + fcmd_prefix = "RESUMECOMMAND" + fcmd = settings.get(fcmd_prefix + "_" + protocol.upper()) + if not fcmd: + fcmd = settings.get(fcmd_prefix) + + fcmd_vars = { + "DISTDIR" : os.path.dirname(pkg_path), + "URI" : uri, + "FILE" : os.path.basename(pkg_path) + } + + fetch_env = dict((k, settings[k]) for k in settings) + fetch_args = [portage.util.varexpand(x, mydict=fcmd_vars) \ + for x in shlex.split(fcmd)] + + portage.util.ensure_dirs(os.path.dirname(pkg_path)) + if use_locks: + self.lock() + + retval = portage.process.spawn(fetch_args, env=fetch_env, + fd_pipes=fd_pipes, returnpid=True) + + self.pid = retval[0] + + os.close(slave_fd) + files["fetcher"] = os.fdopen(master_fd, 'r') + self.register(files["fetcher"].fileno(), + select.POLLIN, self._output_handler) + + def _output_handler(self, fd, event): + files = self.files + buf = array.array('B') + try: + buf.fromfile(files["fetcher"], self._bufsize) + except EOFError: + pass + if buf: + buf.tofile(files["out"]) + files["out"].flush() + else: + self.unregister(files["fetcher"].fileno()) + for f in files.values(): + f.close() + if self.locked: + self.unlock() + + def lock(self): + """ + This raises an AlreadyLocked exception if lock() is called + while a lock is already held. In order to avoid this, call + unlock() or check whether the "locked" attribute is True + or False before calling lock(). + """ + if self._lock_obj is not None: + raise self.AlreadyLocked((self._lock_obj,)) + + self._lock_obj = portage.locks.lockfile( + self.pkg_path, wantnewlockfile=1) + self.locked = True + + class AlreadyLocked(portage.exception.PortageException): + pass + + def unlock(self): + if self._lock_obj is None: + return + portage.locks.unlockfile(self._lock_obj) + self._lock_obj = None + self.locked = False + + def poll(self): + if self.returncode is not None: + return self.returncode + retval = os.waitpid(self.pid, os.WNOHANG) + if retval == (0, 0): + return None + self._set_returncode(retval) + return self.returncode + + def cancel(self): + if self.isAlive(): + os.kill(self.pid, signal.SIGTERM) + self.cancelled = True + if self.pid is not None: + self.wait() + return self.returncode + + def isAlive(self): + return self.pid is not None and \ + self.returncode is None + + def wait(self): + if self.returncode is not None: + return self.returncode + self._set_returncode(os.waitpid(self.pid, 0)) + return self.returncode + + def _set_returncode(self, wait_retval): + + retval = wait_retval[1] + portage.process.spawned_pids.remove(self.pid) + if retval != os.EX_OK: + if retval & 0xff: + retval = (retval & 0xff) << 8 + else: + retval = retval >> 8 + + self.returncode = retval + class BinpkgMerge(Task): __slots__ = ("find_blockers", "ldpath_mtimes", @@ -6573,9 +6766,7 @@ class Scheduler(object): self._task_queue.clear() while running_tasks: task = running_tasks.pop() - if task.poll() is None: - os.kill(task.pid, signal.SIGTERM) - task.wait() + task.cancel() if rval == os.EX_OK or not keep_going: break @@ -6694,8 +6885,10 @@ class Scheduler(object): while task_queue and (len(running_tasks) < max_jobs): task = task_queue.popleft() - task.start() - running_tasks.add(task) + cancelled = getattr(task, "cancelled", None) + if not cancelled: + task.start() + running_tasks.add(task) state_changed = True return state_changed @@ -6726,15 +6919,27 @@ class Scheduler(object): if isinstance(x, Package) and x.operation == "merge"] mtimedb.commit() + prefetchers = weakref.WeakValueDictionary() + getbinpkg = "--getbinpkg" in self.myopts + if self._parallel_fetch: for pkg in mylist: - if not isinstance(pkg, Package) or \ - not pkg.type_name == "ebuild": + if not isinstance(pkg, Package): continue - - self._add_task(EbuildFetcherAsync(log_file=self._fetch_log, - pkg=pkg, register=self._register, - unregister=self._unregister)) + if pkg.type_name == "ebuild": + self._add_task(EbuildFetcherAsync( + log_file=self._fetch_log, + pkg=pkg, register=self._register, + unregister=self._unregister)) + elif pkg.type_name == "binary" and getbinpkg and \ + pkg.root_config.trees["bintree"].isremote(pkg.cpv): + prefetcher = BinpkgFetcherAsync( + log_file=self._fetch_log, + pkg=pkg, register=self._register, + unregister=self._unregister) + prefetchers[pkg] = prefetcher + self._add_task(prefetcher) + del prefetcher # Verify all the manifests now so that the user is notified of failure # as soon as possible. @@ -6805,14 +7010,15 @@ class Scheduler(object): self._execute_task(bad_resume_opts, failed_fetches, mydbapi, mergecount, - myfeat, mymergelist, x, xterm_titles) + myfeat, mymergelist, x, + prefetchers, xterm_titles) except self._pkg_failure, e: return e.status return self._post_merge(mtimedb, xterm_titles, failed_fetches) def _execute_task(self, bad_resume_opts, failed_fetches, mydbapi, mergecount, myfeat, - mymergelist, pkg, xterm_titles): + mymergelist, pkg, prefetchers, xterm_titles): favorites = self._favorites mtimedb = self._mtimedb from portage.elog import elog_process @@ -6963,8 +7169,27 @@ class Scheduler(object): phasefilter=filter_mergephases) build_dir.unlock() - elif x[0]=="binary": - #merge the tbz2 + elif x.type_name == "binary": + # The prefetcher have already completed or it + # could be running now. If it's running now, + # wait for it to complete since it holds + # a lock on the file being fetched. The + # portage.locks functions are only designed + # to work between separate processes. Since + # the lock is held by the current process, + # use the scheduler and fetcher methods to + # synchronize with the fetcher. + prefetcher = prefetchers.get(pkg) + if prefetcher is not None: + if not prefetcher.isAlive(): + prefetcher.cancel() + else: + retval = None + while retval is None: + self._schedule() + retval = prefetcher.poll() + del prefetcher + fetcher = BinpkgFetcher(pkg=pkg, pretend=pretend, use_locks=("distlocks" in pkgsettings.features)) mytbz2 = fetcher.pkg_path -- cgit v1.2.3-1-g7c22