summaryrefslogtreecommitdiffstats
path: root/pym
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2008-06-30 12:08:16 +0000
committerZac Medico <zmedico@gentoo.org>2008-06-30 12:08:16 +0000
commite4edadf5ae7063f375d76be151c6d0e949980ecf (patch)
tree795d459b82a7866b7656fc2a7772feca551d728b /pym
parent9a4b31a59d24cd5aca28d326e44bce66683c9048 (diff)
downloadportage-e4edadf5ae7063f375d76be151c6d0e949980ecf.tar.gz
portage-e4edadf5ae7063f375d76be151c6d0e949980ecf.tar.bz2
portage-e4edadf5ae7063f375d76be151c6d0e949980ecf.zip
Reimplement parallel-fetch by spawning the `ebuild fetch` command for each
ebuild. The benefit of using this approach is that it can be integrated together with parallel build scheduling that's planned. Parallel-fetch support for binhost is not implemented yet, though it worked previously. svn path=/main/trunk/; revision=10855
Diffstat (limited to 'pym')
-rw-r--r--pym/_emerge/__init__.py322
-rw-r--r--pym/portage/__init__.py5
2 files changed, 234 insertions, 93 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py
index d2f34d8b9..b6c74b586 100644
--- a/pym/_emerge/__init__.py
+++ b/pym/_emerge/__init__.py
@@ -1433,26 +1433,141 @@ class _PackageMetadataWrapper(_PackageMetadataWrapperBase):
v = 0
self._pkg.mtime = v
-class EbuildFetcher(Task):
+class EbuildFetcher(SlotObject):
__slots__ = ("fetch_all", "pkg", "pretend", "settings")
- def _get_hash_key(self):
- hash_key = getattr(self, "_hash_key", None)
- if hash_key is None:
- self._hash_key = ("EbuildFetcher", self.pkg._get_hash_key())
- return self._hash_key
-
def execute(self):
portdb = self.pkg.root_config.trees["porttree"].dbapi
ebuild_path = portdb.findname(self.pkg.cpv)
debug = self.settings.get("PORTAGE_DEBUG") == "1"
+
retval = portage.doebuild(ebuild_path, "fetch",
- self.settings["ROOT"], self.settings, debug,
- self.pretend, fetchonly=1, fetchall=self.fetch_all,
+ self.settings["ROOT"], self.settings, debug=debug,
+ listonly=self.pretend, fetchonly=1, fetchall=self.fetch_all,
mydbapi=portdb, tree="porttree")
return retval
+class EbuildFetcherAsync(SlotObject):
+
+ __slots__ = ("log_file", "fd_pipes", "pkg",
+ "register", "unregister",
+ "pid", "returncode", "files")
+
+ _file_names = ("fetcher", "out")
+ _files_dict = slot_dict_class(_file_names)
+ _bufsize = 4096
+
+ def start(self):
+ # flush any pending output
+ fd_pipes = self.fd_pipes
+ if fd_pipes is None:
+ fd_pipes = {
+ 0 : sys.stdin.fileno(),
+ 1 : sys.stdout.fileno(),
+ 2 : sys.stderr.fileno(),
+ }
+
+ log_file = self.log_file
+ self.files = self._files_dict()
+ files = self.files
+
+ if log_file is not None:
+ files["out"] = open(log_file, "a")
+ portage.util.apply_secpass_permissions(log_file,
+ uid=portage.portage_uid, gid=portage.portage_gid,
+ mode=0660)
+ else:
+ for fd in fd_pipes.itervalues():
+ if fd == sys.stdout.fileno():
+ sys.stdout.flush()
+ if fd == sys.stderr.fileno():
+ sys.stderr.flush()
+
+ files["out"] = os.fdopen(os.dup(fd_pipes[1]), 'w')
+
+ master_fd, slave_fd = os.pipe()
+
+ import fcntl
+ fcntl.fcntl(master_fd, fcntl.F_SETFL,
+ fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
+
+ fd_pipes.setdefault(0, sys.stdin.fileno())
+ fd_pipes_orig = fd_pipes.copy()
+ fd_pipes[0] = fd_pipes_orig[0]
+ fd_pipes[1] = slave_fd
+ fd_pipes[2] = slave_fd
+
+ root_config = self.pkg.root_config
+ portdb = root_config.trees["porttree"].dbapi
+ ebuild_path = portdb.findname(self.pkg.cpv)
+ settings = root_config.settings
+
+ fetch_env = dict((k, settings[k]) for k in settings)
+ fetch_env["FEATURES"] = fetch_env.get("FEATURES", "") + " -cvs"
+ fetch_env["PORTAGE_NICENESS"] = "0"
+ fetch_env["PORTAGE_PARALLEL_FETCHONLY"] = "1"
+
+ ebuild_binary = os.path.join(
+ settings["EBUILD_BIN_PATH"], "ebuild")
+
+ fetch_args = [ebuild_binary, ebuild_path, "fetch"]
+ debug = settings.get("PORTAGE_DEBUG") == "1"
+ if debug:
+ fetch_args.append("--debug")
+
+ retval = portage.process.spawn(fetch_args, env=fetch_env,
+ fd_pipes=fd_pipes, returnpid=True)
+
+ self.pid = retval[0]
+
+ os.close(slave_fd)
+ files["fetcher"] = os.fdopen(master_fd, 'r')
+ self.register(files["fetcher"].fileno(),
+ select.POLLIN, self._output_handler)
+
+ def _output_handler(self, fd, event):
+ files = self.files
+ buf = array.array('B')
+ try:
+ buf.fromfile(files["fetcher"], self._bufsize)
+ except EOFError:
+ pass
+ if buf:
+ buf.tofile(files["out"])
+ files["out"].flush()
+ else:
+ self.unregister(files["fetcher"].fileno())
+ for f in files.values():
+ f.close()
+
+ def poll(self):
+ if self.returncode is not None:
+ return self.returncode
+ retval = os.waitpid(self.pid, os.WNOHANG)
+ if retval == (0, 0):
+ return None
+ self._set_returncode(retval)
+ return self.returncode
+
+ def wait(self):
+ if self.returncode is not None:
+ return self.returncode
+ self._set_returncode(os.waitpid(self.pid, 0))
+ return self.returncode
+
+ def _set_returncode(self, wait_retval):
+
+ retval = wait_retval[1]
+ portage.process.spawned_pids.remove(self.pid)
+ if retval != os.EX_OK:
+ if retval & 0xff:
+ retval = (retval & 0xff) << 8
+ else:
+ retval = retval >> 8
+
+ self.returncode = retval
+
class EbuildBuildDir(SlotObject):
__slots__ = ("pkg", "settings",
@@ -1566,9 +1681,12 @@ class EbuildBuild(Task):
ebuild_phase = EbuildPhase(fd_pipes=fd_pipes,
pkg=self.pkg, phase=mydo, register=self.register,
settings=settings, unregister=self.unregister)
+
ebuild_phase.start()
- self.schedule()
- retval = ebuild_phase.wait()
+ retval = None
+ while retval is None:
+ self.schedule()
+ retval = ebuild_phase.poll()
portage._post_phase_userpriv_perms(settings)
if mydo == "install":
@@ -1686,10 +1804,25 @@ class EbuildPhase(SlotObject):
for f in files.values():
f.close()
+ def poll(self):
+ if self.returncode is not None:
+ return self.returncode
+ retval = os.waitpid(self.pid, os.WNOHANG)
+ if retval == (0, 0):
+ return None
+ self._set_returncode(retval)
+ return self.returncode
+
def wait(self):
- pid = self.pid
- retval = os.waitpid(pid, 0)[1]
- portage.process.spawned_pids.remove(pid)
+ if self.returncode is not None:
+ return self.returncode
+ self._set_returncode(os.waitpid(self.pid, 0))
+ return self.returncode
+
+ def _set_returncode(self, wait_retval):
+
+ retval = wait_retval[1]
+ portage.process.spawned_pids.remove(self.pid)
if retval != os.EX_OK:
if retval & 0xff:
retval = (retval & 0xff) << 8
@@ -1706,7 +1839,6 @@ class EbuildPhase(SlotObject):
eerror(l, phase=self.phase, key=self.pkg.cpv)
self.returncode = retval
- return self.returncode
class EbuildBinpkg(Task):
"""
@@ -6327,6 +6459,8 @@ class Scheduler(object):
"--fetchonly", "--fetch-all-uri",
"--nodeps", "--pretend"])
+ _fetch_log = "/var/log/emerge-fetch.log"
+
def __init__(self, settings, trees, mtimedb, myopts,
spinner, mergelist, favorites, digraph):
self.settings = settings
@@ -6345,9 +6479,38 @@ class Scheduler(object):
self.pkgsettings[root] = portage.config(
clone=trees[root]["vartree"].settings)
self.curval = 0
- self._spawned_pids = []
self._poll_event_handlers = {}
self._poll = select.poll()
+ from collections import deque
+ self._task_queue = deque()
+ self._running_tasks = set()
+ self._max_jobs = 1
+ self._parallel_fetch = False
+ features = self.settings.features
+ if "parallel-fetch" in features and \
+ not ("--pretend" in self.myopts or \
+ "--fetch-all-uri" in self.myopts or \
+ "--fetchonly" in self.myopts):
+ if "distlocks" not in features:
+ portage.writemsg(red("!!!")+"\n", noiselevel=-1)
+ portage.writemsg(red("!!!")+" parallel-fetching " + \
+ "requires the distlocks feature enabled"+"\n",
+ noiselevel=-1)
+ portage.writemsg(red("!!!")+" you have it disabled, " + \
+ "thus parallel-fetching is being disabled"+"\n",
+ noiselevel=-1)
+ portage.writemsg(red("!!!")+"\n", noiselevel=-1)
+ elif len(mergelist) > 1:
+ self._parallel_fetch = True
+
+ # clear out existing fetch log if it exists
+ try:
+ open(self._fetch_log, 'w')
+ except EnvironmentError:
+ pass
+
+ def _add_task(self, task):
+ self._task_queue.append(task)
class _pkg_failure(portage.exception.PortageException):
"""
@@ -6400,20 +6563,18 @@ class Scheduler(object):
def merge(self):
keep_going = "--keep-going" in self.myopts
+ running_tasks = self._running_tasks
while True:
try:
rval = self._merge()
finally:
- spawned_pids = self._spawned_pids
- while spawned_pids:
- pid = spawned_pids.pop()
- try:
- if os.waitpid(pid, os.WNOHANG) == (0, 0):
- os.kill(pid, signal.SIGTERM)
- os.waitpid(pid, 0)
- except OSError:
- pass # cleaned up elsewhere.
+ # clean up child process if necessary
+ while running_tasks:
+ task = running_tasks.pop()
+ if task.poll() is None:
+ os.kill(task.pid, signal.SIGTERM)
+ task.wait()
if rval == os.EX_OK or not keep_going:
break
@@ -6493,25 +6654,6 @@ class Scheduler(object):
mydepgraph.break_refs(dropped_tasks)
return (mylist, dropped_tasks)
- def _poll_child_processes(self):
- """
- After each merge, collect status from child processes
- in order to clean up zombies (such as the parallel-fetch
- process).
- """
- spawned_pids = self._spawned_pids
- if not spawned_pids:
- return
- for pid in list(spawned_pids):
- try:
- if os.waitpid(pid, os.WNOHANG) == (0, 0):
- continue
- except OSError:
- # This pid has been cleaned up elsewhere,
- # so remove it from our list.
- pass
- spawned_pids.remove(pid)
-
def _register(self, f, eventmask, handler):
self._poll_event_handlers[f] = handler
self._poll.register(f, eventmask)
@@ -6519,11 +6661,43 @@ class Scheduler(object):
def _unregister(self, f):
self._poll.unregister(f)
del self._poll_event_handlers[f]
+ self._schedule_tasks()
def _schedule(self):
- while self._poll_event_handlers:
- for f, event in self._poll.poll():
- self._poll_event_handlers[f](f, event)
+ event_handlers = self._poll_event_handlers
+ running_tasks = self._running_tasks
+ poll = self._poll.poll
+
+ self._schedule_tasks()
+
+ while event_handlers:
+ for f, event in poll():
+ event_handlers[f](f, event)
+
+ if len(event_handlers) <= len(running_tasks):
+ # Assuming one handler per task, this
+ # means the caller has unregistered it's
+ # handler, so it's time to yield.
+ break
+
+ def _schedule_tasks(self):
+ task_queue = self._task_queue
+ running_tasks = self._running_tasks
+ max_jobs = self._max_jobs
+ state_changed = False
+
+ for task in list(running_tasks):
+ if task.poll() is not None:
+ running_tasks.remove(task)
+ state_changed = True
+
+ while task_queue and (len(running_tasks) < max_jobs):
+ task = task_queue.popleft()
+ task.start()
+ running_tasks.add(task)
+ state_changed = True
+
+ return state_changed
def _merge(self):
mylist = self._mergelist
@@ -6551,6 +6725,16 @@ class Scheduler(object):
if isinstance(x, Package) and x.operation == "merge"]
mtimedb.commit()
+ if self._parallel_fetch:
+ for pkg in mylist:
+ if not isinstance(pkg, Package) or \
+ not pkg.type_name == "ebuild":
+ continue
+
+ self._add_task(EbuildFetcherAsync(log_file=self._fetch_log,
+ pkg=pkg, register=self._register,
+ unregister=self._unregister))
+
# Verify all the manifests now so that the user is notified of failure
# as soon as possible.
if "--fetchonly" not in self.myopts and \
@@ -6584,49 +6768,6 @@ class Scheduler(object):
myfeat = self.settings.features[:]
bad_resume_opts = set(["--ask", "--changelog", "--skipfirst",
"--resume"])
- if "parallel-fetch" in myfeat and \
- not ("--pretend" in self.myopts or \
- "--fetch-all-uri" in self.myopts or \
- "--fetchonly" in self.myopts):
- if "distlocks" not in myfeat:
- print red("!!!")
- print red("!!!")+" parallel-fetching requires the distlocks feature enabled"
- print red("!!!")+" you have it disabled, thus parallel-fetching is being disabled"
- print red("!!!")
- elif len(mymergelist) > 1:
- fetch_log = "/var/log/emerge-fetch.log"
- logfile = open(fetch_log, "w")
- fd_pipes = {1:logfile.fileno(), 2:logfile.fileno()}
- portage.util.apply_secpass_permissions(fetch_log,
- uid=portage.portage_uid, gid=portage.portage_gid,
- mode=0660)
- fetch_env = os.environ.copy()
- fetch_env["FEATURES"] = fetch_env.get("FEATURES", "") + " -cvs"
- fetch_env["PORTAGE_NICENESS"] = "0"
- fetch_env["PORTAGE_PARALLEL_FETCHONLY"] = "1"
- fetch_args = [sys.argv[0], "--resume",
- "--fetchonly", "--nodeps"]
- resume_opts = self.myopts.copy()
- # For automatic resume, we need to prevent
- # any of bad_resume_opts from leaking in
- # via EMERGE_DEFAULT_OPTS.
- resume_opts["--ignore-default-opts"] = True
- for myopt, myarg in resume_opts.iteritems():
- if myopt not in bad_resume_opts:
- if myarg is True:
- fetch_args.append(myopt)
- else:
- fetch_args.append(myopt +"="+ myarg)
- self._spawned_pids.extend(
- portage.process.spawn(
- fetch_args, env=fetch_env,
- fd_pipes=fd_pipes, returnpid=True))
- logfile.close() # belongs to the spawned process
- del fetch_log, logfile, fd_pipes, fetch_env, fetch_args, \
- resume_opts
- print ">>> starting parallel fetching pid %d" % \
- self._spawned_pids[-1]
-
metadata_keys = [k for k in portage.auxdbkeys \
if not k.startswith("UNUSED_")] + ["USE"]
@@ -6926,7 +7067,6 @@ class Scheduler(object):
# due to power failure, SIGKILL, etc...
mtimedb.commit()
self.curval += 1
- self._poll_child_processes()
def _post_merge(self, mtimedb, xterm_titles, failed_fetches):
if "--pretend" not in self.myopts:
diff --git a/pym/portage/__init__.py b/pym/portage/__init__.py
index 456f4e013..50711da28 100644
--- a/pym/portage/__init__.py
+++ b/pym/portage/__init__.py
@@ -3274,8 +3274,9 @@ def fetch(myuris, mysettings, listonly=0, fetchonly=0, locks_in_subdir=".locks",
# file size. The parent process will verify their checksums prior to
# the unpack phase.
- parallel_fetchonly = fetchonly and \
- "PORTAGE_PARALLEL_FETCHONLY" in mysettings
+ parallel_fetchonly = "PORTAGE_PARALLEL_FETCHONLY" in mysettings
+ if parallel_fetchonly:
+ fetchonly = 1
check_config_instance(mysettings)