summaryrefslogtreecommitdiffstats
path: root/pym
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2008-07-01 09:47:32 +0000
committerZac Medico <zmedico@gentoo.org>2008-07-01 09:47:32 +0000
commit43833b58736bef2565250f3965a22ab747ba6d0b (patch)
tree40aa33e23d92e1de640798a286fd93013d6bd854 /pym
parent1b14d63fb6cc42b4251954999728b2af9c46a3b6 (diff)
downloadportage-43833b58736bef2565250f3965a22ab747ba6d0b.tar.gz
portage-43833b58736bef2565250f3965a22ab747ba6d0b.tar.bz2
portage-43833b58736bef2565250f3965a22ab747ba6d0b.zip
Add a new BinpkgFetcherAsync class and use it to implement parellel-fetch
for --getbinpkg. svn path=/main/trunk/; revision=10868
Diffstat (limited to 'pym')
-rw-r--r--pym/_emerge/__init__.py255
1 files changed, 240 insertions, 15 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py
index acaf16287..9aea4415f 100644
--- a/pym/_emerge/__init__.py
+++ b/pym/_emerge/__init__.py
@@ -21,7 +21,11 @@ except KeyboardInterrupt:
sys.exit(1)
import array
+import fcntl
import select
+import shlex
+import urlparse
+import weakref
import gc
import os, stat
import platform
@@ -1991,6 +1995,195 @@ class BinpkgFetcher(Task):
rval = 1
return rval
+class BinpkgFetcherAsync(SlotObject):
+
+ __slots__ = ("cancelled", "log_file", "fd_pipes", "pkg",
+ "register", "unregister",
+ "locked", "files", "pid", "pkg_path", "returncode", "_lock_obj")
+
+ _file_names = ("fetcher", "out")
+ _files_dict = slot_dict_class(_file_names)
+ _bufsize = 4096
+
+ def __init__(self, **kwargs):
+ SlotObject.__init__(self, **kwargs)
+ pkg = self.pkg
+ self.pkg_path = pkg.root_config.trees["bintree"].getname(pkg.cpv)
+
+ def start(self):
+
+ if self.cancelled:
+ self.pid = -1
+ return
+
+ fd_pipes = self.fd_pipes
+ if fd_pipes is None:
+ fd_pipes = {
+ 0 : sys.stdin.fileno(),
+ 1 : sys.stdout.fileno(),
+ 2 : sys.stderr.fileno(),
+ }
+
+ log_file = self.log_file
+ self.files = self._files_dict()
+ files = self.files
+
+ if log_file is not None:
+ files["out"] = open(log_file, "a")
+ portage.util.apply_secpass_permissions(log_file,
+ uid=portage.portage_uid, gid=portage.portage_gid,
+ mode=0660)
+ else:
+ # flush any pending output
+ for fd in fd_pipes.itervalues():
+ if fd == sys.stdout.fileno():
+ sys.stdout.flush()
+ if fd == sys.stderr.fileno():
+ sys.stderr.flush()
+
+ files["out"] = os.fdopen(os.dup(fd_pipes[1]), 'w')
+
+ master_fd, slave_fd = os.pipe()
+ fcntl.fcntl(master_fd, fcntl.F_SETFL,
+ fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
+
+ fd_pipes.setdefault(0, sys.stdin.fileno())
+ fd_pipes_orig = fd_pipes.copy()
+ fd_pipes[0] = fd_pipes_orig[0]
+ fd_pipes[1] = slave_fd
+ fd_pipes[2] = slave_fd
+
+ pkg = self.pkg
+ bintree = pkg.root_config.trees["bintree"]
+ settings = bintree.settings
+ use_locks = "distlocks" in settings.features
+ pkg_path = self.pkg_path
+ resume = os.path.exists(pkg_path)
+
+ # urljoin doesn't work correctly with
+ # unrecognized protocols like sftp
+ if bintree._remote_has_index:
+ rel_uri = bintree._remotepkgs[pkg.cpv].get("PATH")
+ if not rel_uri:
+ rel_uri = pkg.cpv + ".tbz2"
+ uri = bintree._remote_base_uri.rstrip("/") + \
+ "/" + rel_uri.lstrip("/")
+ else:
+ uri = settings["PORTAGE_BINHOST"].rstrip("/") + \
+ "/" + pkg.pf + ".tbz2"
+
+ protocol = urlparse.urlparse(uri)[0]
+ fcmd_prefix = "FETCHCOMMAND"
+ if resume:
+ fcmd_prefix = "RESUMECOMMAND"
+ fcmd = settings.get(fcmd_prefix + "_" + protocol.upper())
+ if not fcmd:
+ fcmd = settings.get(fcmd_prefix)
+
+ fcmd_vars = {
+ "DISTDIR" : os.path.dirname(pkg_path),
+ "URI" : uri,
+ "FILE" : os.path.basename(pkg_path)
+ }
+
+ fetch_env = dict((k, settings[k]) for k in settings)
+ fetch_args = [portage.util.varexpand(x, mydict=fcmd_vars) \
+ for x in shlex.split(fcmd)]
+
+ portage.util.ensure_dirs(os.path.dirname(pkg_path))
+ if use_locks:
+ self.lock()
+
+ retval = portage.process.spawn(fetch_args, env=fetch_env,
+ fd_pipes=fd_pipes, returnpid=True)
+
+ self.pid = retval[0]
+
+ os.close(slave_fd)
+ files["fetcher"] = os.fdopen(master_fd, 'r')
+ self.register(files["fetcher"].fileno(),
+ select.POLLIN, self._output_handler)
+
+ def _output_handler(self, fd, event):
+ files = self.files
+ buf = array.array('B')
+ try:
+ buf.fromfile(files["fetcher"], self._bufsize)
+ except EOFError:
+ pass
+ if buf:
+ buf.tofile(files["out"])
+ files["out"].flush()
+ else:
+ self.unregister(files["fetcher"].fileno())
+ for f in files.values():
+ f.close()
+ if self.locked:
+ self.unlock()
+
+ def lock(self):
+ """
+ This raises an AlreadyLocked exception if lock() is called
+ while a lock is already held. In order to avoid this, call
+ unlock() or check whether the "locked" attribute is True
+ or False before calling lock().
+ """
+ if self._lock_obj is not None:
+ raise self.AlreadyLocked((self._lock_obj,))
+
+ self._lock_obj = portage.locks.lockfile(
+ self.pkg_path, wantnewlockfile=1)
+ self.locked = True
+
+ class AlreadyLocked(portage.exception.PortageException):
+ pass
+
+ def unlock(self):
+ if self._lock_obj is None:
+ return
+ portage.locks.unlockfile(self._lock_obj)
+ self._lock_obj = None
+ self.locked = False
+
+ def poll(self):
+ if self.returncode is not None:
+ return self.returncode
+ retval = os.waitpid(self.pid, os.WNOHANG)
+ if retval == (0, 0):
+ return None
+ self._set_returncode(retval)
+ return self.returncode
+
+ def cancel(self):
+ if self.isAlive():
+ os.kill(self.pid, signal.SIGTERM)
+ self.cancelled = True
+ if self.pid is not None:
+ self.wait()
+ return self.returncode
+
+ def isAlive(self):
+ return self.pid is not None and \
+ self.returncode is None
+
+ def wait(self):
+ if self.returncode is not None:
+ return self.returncode
+ self._set_returncode(os.waitpid(self.pid, 0))
+ return self.returncode
+
+ def _set_returncode(self, wait_retval):
+
+ retval = wait_retval[1]
+ portage.process.spawned_pids.remove(self.pid)
+ if retval != os.EX_OK:
+ if retval & 0xff:
+ retval = (retval & 0xff) << 8
+ else:
+ retval = retval >> 8
+
+ self.returncode = retval
+
class BinpkgMerge(Task):
__slots__ = ("find_blockers", "ldpath_mtimes",
@@ -6573,9 +6766,7 @@ class Scheduler(object):
self._task_queue.clear()
while running_tasks:
task = running_tasks.pop()
- if task.poll() is None:
- os.kill(task.pid, signal.SIGTERM)
- task.wait()
+ task.cancel()
if rval == os.EX_OK or not keep_going:
break
@@ -6694,8 +6885,10 @@ class Scheduler(object):
while task_queue and (len(running_tasks) < max_jobs):
task = task_queue.popleft()
- task.start()
- running_tasks.add(task)
+ cancelled = getattr(task, "cancelled", None)
+ if not cancelled:
+ task.start()
+ running_tasks.add(task)
state_changed = True
return state_changed
@@ -6726,15 +6919,27 @@ class Scheduler(object):
if isinstance(x, Package) and x.operation == "merge"]
mtimedb.commit()
+ prefetchers = weakref.WeakValueDictionary()
+ getbinpkg = "--getbinpkg" in self.myopts
+
if self._parallel_fetch:
for pkg in mylist:
- if not isinstance(pkg, Package) or \
- not pkg.type_name == "ebuild":
+ if not isinstance(pkg, Package):
continue
-
- self._add_task(EbuildFetcherAsync(log_file=self._fetch_log,
- pkg=pkg, register=self._register,
- unregister=self._unregister))
+ if pkg.type_name == "ebuild":
+ self._add_task(EbuildFetcherAsync(
+ log_file=self._fetch_log,
+ pkg=pkg, register=self._register,
+ unregister=self._unregister))
+ elif pkg.type_name == "binary" and getbinpkg and \
+ pkg.root_config.trees["bintree"].isremote(pkg.cpv):
+ prefetcher = BinpkgFetcherAsync(
+ log_file=self._fetch_log,
+ pkg=pkg, register=self._register,
+ unregister=self._unregister)
+ prefetchers[pkg] = prefetcher
+ self._add_task(prefetcher)
+ del prefetcher
# Verify all the manifests now so that the user is notified of failure
# as soon as possible.
@@ -6805,14 +7010,15 @@ class Scheduler(object):
self._execute_task(bad_resume_opts,
failed_fetches,
mydbapi, mergecount,
- myfeat, mymergelist, x, xterm_titles)
+ myfeat, mymergelist, x,
+ prefetchers, xterm_titles)
except self._pkg_failure, e:
return e.status
return self._post_merge(mtimedb, xterm_titles, failed_fetches)
def _execute_task(self, bad_resume_opts,
failed_fetches, mydbapi, mergecount, myfeat,
- mymergelist, pkg, xterm_titles):
+ mymergelist, pkg, prefetchers, xterm_titles):
favorites = self._favorites
mtimedb = self._mtimedb
from portage.elog import elog_process
@@ -6963,8 +7169,27 @@ class Scheduler(object):
phasefilter=filter_mergephases)
build_dir.unlock()
- elif x[0]=="binary":
- #merge the tbz2
+ elif x.type_name == "binary":
+ # The prefetcher have already completed or it
+ # could be running now. If it's running now,
+ # wait for it to complete since it holds
+ # a lock on the file being fetched. The
+ # portage.locks functions are only designed
+ # to work between separate processes. Since
+ # the lock is held by the current process,
+ # use the scheduler and fetcher methods to
+ # synchronize with the fetcher.
+ prefetcher = prefetchers.get(pkg)
+ if prefetcher is not None:
+ if not prefetcher.isAlive():
+ prefetcher.cancel()
+ else:
+ retval = None
+ while retval is None:
+ self._schedule()
+ retval = prefetcher.poll()
+ del prefetcher
+
fetcher = BinpkgFetcher(pkg=pkg, pretend=pretend,
use_locks=("distlocks" in pkgsettings.features))
mytbz2 = fetcher.pkg_path