summaryrefslogtreecommitdiffstats
path: root/pym/_emerge/__init__.py
diff options
context:
space:
mode:
Diffstat (limited to 'pym/_emerge/__init__.py')
-rw-r--r--pym/_emerge/__init__.py461
1 files changed, 326 insertions, 135 deletions
diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py
index 0cd94208d..7f122bb4d 100644
--- a/pym/_emerge/__init__.py
+++ b/pym/_emerge/__init__.py
@@ -38,7 +38,7 @@ except ImportError:
from os import path as osp
sys.path.insert(0, osp.join(osp.dirname(osp.dirname(osp.realpath(__file__))), "pym"))
import portage
-portage._disable_legacy_globals()
+
from portage import digraph, portdbapi
from portage.const import NEWS_LIB_PATH, CACHE_PATH, PRIVATE_PATH, USER_CONFIG_PATH, GLOBAL_CONFIG_PATH
@@ -2292,6 +2292,95 @@ class EbuildExecuter(CompositeTask):
self._start_task(ebuild_phases, self._default_final_exit)
+class EbuildMetadataPhase(SubProcess):
+
+ """
+ Asynchronous interface for the ebuild "depend" phase which is
+ used to extract metadata from the ebuild.
+ """
+
+ __slots__ = ("cpv", "ebuild_path", "fd_pipes", "metadata_callback",
+ "ebuild_mtime", "portdb", "repo_path", "settings") + \
+ ("files", "_raw_metadata")
+
+ _file_names = ("ebuild",)
+ _files_dict = slot_dict_class(_file_names, prefix="")
+ _bufsize = SpawnProcess._bufsize
+ _metadata_fd = 9
+
+ def start(self):
+ settings = self.settings
+ ebuild_path = self.ebuild_path
+ debug = settings.get("PORTAGE_DEBUG") == "1"
+ master_fd = None
+ slave_fd = None
+ fd_pipes = None
+ if self.fd_pipes is not None:
+ fd_pipes = self.fd_pipes.copy()
+ else:
+ fd_pipes = {}
+
+ fd_pipes.setdefault(0, sys.stdin.fileno())
+ fd_pipes.setdefault(1, sys.stdout.fileno())
+ fd_pipes.setdefault(2, sys.stderr.fileno())
+
+ # flush any pending output
+ for fd in fd_pipes.itervalues():
+ if fd == sys.stdout.fileno():
+ sys.stdout.flush()
+ if fd == sys.stderr.fileno():
+ sys.stderr.flush()
+
+ fd_pipes_orig = fd_pipes.copy()
+ self.files = self._files_dict()
+ files = self.files
+
+ master_fd, slave_fd = os.pipe()
+ fcntl.fcntl(master_fd, fcntl.F_SETFL,
+ fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK)
+
+ fd_pipes[self._metadata_fd] = slave_fd
+
+ retval = portage.doebuild(ebuild_path, "depend",
+ settings["ROOT"], settings, debug,
+ mydbapi=self.portdb, tree="porttree",
+ fd_pipes=fd_pipes, returnpid=True)
+
+ os.close(slave_fd)
+
+ if isinstance(retval, int):
+ # doebuild failed before spawning
+ os.close(master_fd)
+ self.returncode = retval
+ self.wait()
+ return
+
+ self.pid = retval[0]
+ portage.process.spawned_pids.remove(self.pid)
+
+ self._raw_metadata = []
+ files.ebuild = os.fdopen(master_fd, 'r')
+ self._reg_id = self.scheduler.register(files.ebuild.fileno(),
+ PollConstants.POLLIN, self._output_handler)
+ self.registered = True
+
+ def _output_handler(self, fd, event):
+ files = self.files
+ self._raw_metadata.append(files.ebuild.read())
+ if not self._raw_metadata[-1]:
+ for f in files.values():
+ f.close()
+ self.registered = False
+ self._wait()
+
+ if self.returncode == os.EX_OK:
+ metadata = izip(portage.auxdbkeys,
+ "".join(self._raw_metadata).splitlines())
+ self.metadata_callback(self.cpv, self.ebuild_path,
+ self.repo_path, metadata, self.ebuild_mtime)
+
+ return self.registered
+
class EbuildPhase(SubProcess):
__slots__ = ("fd_pipes", "phase", "pkg",
@@ -2387,6 +2476,15 @@ class EbuildPhase(SubProcess):
mydbapi=mydbapi, tree=tree,
fd_pipes=fd_pipes, returnpid=True)
+ os.close(slave_fd)
+
+ if isinstance(retval, int):
+ # doebuild failed before spawning
+ os.close(master_fd)
+ self.returncode = retval
+ self.wait()
+ return
+
self.pid = retval[0]
portage.process.spawned_pids.remove(self.pid)
@@ -2398,7 +2496,6 @@ class EbuildPhase(SubProcess):
else:
output_handler = self._dummy_handler
- os.close(slave_fd)
files.ebuild = os.fdopen(master_fd, 'r')
self._reg_id = self.scheduler.register(files.ebuild.fileno(),
PollConstants.POLLIN, output_handler)
@@ -7680,7 +7777,118 @@ class SequentialTaskQueue(SlotObject):
def __len__(self):
return len(self._task_queue) + len(self.running_tasks)
-class Scheduler(object):
+class PollLoop(object):
+
+ def __init__(self):
+ self._max_jobs = 1
+ self._max_load = None
+ self._jobs = 0
+ self._poll_event_handlers = {}
+ self._poll_event_handler_ids = {}
+ # Increment id for each new handler.
+ self._event_handler_id = 0
+ try:
+ self._poll = select.poll()
+ except AttributeError:
+ self._poll = PollSelectAdapter()
+
+ def _can_add_job(self):
+ jobs = self._jobs
+ max_jobs = self._max_jobs
+ max_load = self._max_load
+
+ if self._jobs >= self._max_jobs:
+ return False
+
+ if max_load is not None and max_jobs > 1 and self._jobs > 1:
+ try:
+ avg1, avg5, avg15 = os.getloadavg()
+ except OSError, e:
+ writemsg("!!! getloadavg() failed: %s\n" % (e,),
+ noiselevel=-1)
+ del e
+ return False
+
+ if avg1 >= max_load:
+ return False
+
+ return True
+
+ def _register(self, f, eventmask, handler):
+ """
+ @rtype: Integer
+ @return: A unique registration id, for use in schedule() or
+ unregister() calls.
+ """
+ self._event_handler_id += 1
+ reg_id = self._event_handler_id
+ self._poll_event_handler_ids[reg_id] = f
+ self._poll_event_handlers[f] = (handler, reg_id)
+ self._poll.register(f, eventmask)
+ return reg_id
+
+ def _unregister(self, reg_id):
+ f = self._poll_event_handler_ids[reg_id]
+ self._poll.unregister(f)
+ del self._poll_event_handlers[f]
+ del self._poll_event_handler_ids[reg_id]
+ self._schedule_tasks()
+
+ def _schedule(self, wait_id):
+ """
+ Schedule until wait_id is not longer registered
+ for poll() events.
+ @type wait_id: int
+ @param wait_id: a task id to wait for
+ """
+ event_handlers = self._poll_event_handlers
+ handler_ids = self._poll_event_handler_ids
+ poll = self._poll.poll
+
+ self._schedule_tasks()
+
+ while wait_id in handler_ids:
+ for f, event in poll():
+ handler, reg_id = event_handlers[f]
+ if not handler(f, event):
+ self._unregister(reg_id)
+
+ def _schedule_tasks(self):
+ return False
+
+ def _schedule_main(self, wait=False):
+
+ event_handlers = self._poll_event_handlers
+ poll = self._poll.poll
+ max_jobs = self._max_jobs
+
+ state_change = 0
+
+ if self._schedule_tasks():
+ state_change += 1
+
+ while event_handlers:
+ jobs = self._jobs
+
+ for f, event in poll():
+ handler, reg_id = event_handlers[f]
+ if not handler(f, event):
+ state_change += 1
+ self._unregister(reg_id)
+
+ if jobs == self._jobs:
+ continue
+
+ if self._schedule_tasks():
+ state_change += 1
+
+ if not wait and self._jobs < max_jobs:
+ break
+
+ if not state_change:
+ raise AssertionError("tight loop")
+
+class Scheduler(PollLoop):
_opts_ignore_blockers = \
frozenset(["--buildpkgonly",
@@ -7722,6 +7930,7 @@ class Scheduler(object):
def __init__(self, settings, trees, mtimedb, myopts,
spinner, mergelist, favorites, digraph):
+ PollLoop.__init__(self)
self.settings = settings
self.target_root = settings["ROOT"]
self.trees = trees
@@ -7758,15 +7967,6 @@ class Scheduler(object):
self._sched_iface = self._iface_class(
fetch=fetch_iface, register=self._register,
schedule=self._schedule)
- self._poll_event_handlers = {}
- self._poll_event_handler_ids = {}
- # Increment id for each new handler.
- self._event_handler_id = 0
-
- try:
- self._poll = select.poll()
- except AttributeError:
- self._poll = PollSelectAdapter()
self._task_queues = self._task_queues_class()
for k in self._task_queues.allowed_keys:
@@ -7792,7 +7992,6 @@ class Scheduler(object):
self._max_load = myopts.get("--load-average")
self._set_digraph(digraph)
- self._jobs = 0
features = self.settings.features
if "parallel-fetch" in features and \
@@ -8345,24 +8544,10 @@ class Scheduler(object):
while pkg_queue and not failed_pkgs:
- if self._jobs >= max_jobs:
+ if not self._can_add_job():
self._schedule_main()
continue
- if max_load is not None and max_jobs > 1 and self._jobs > 1:
- try:
- avg1, avg5, avg15 = os.getloadavg()
- except OSError, e:
- writemsg("!!! getloadavg() failed: %s\n" % (e,),
- noiselevel=-1)
- del e
- self._schedule_main()
- continue
-
- if avg1 >= max_load:
- self._schedule_main()
- continue
-
pkg = self._choose_pkg()
if pkg is None:
@@ -8389,38 +8574,6 @@ class Scheduler(object):
while self._jobs:
self._schedule_main(wait=True)
- def _schedule_main(self, wait=False):
-
- event_handlers = self._poll_event_handlers
- poll = self._poll.poll
- max_jobs = self._max_jobs
-
- state_change = 0
-
- if self._schedule_tasks():
- state_change += 1
-
- while event_handlers:
- jobs = self._jobs
-
- for f, event in poll():
- handler, reg_id = event_handlers[f]
- if not handler(f, event):
- state_change += 1
- self._unregister(reg_id)
-
- if jobs == self._jobs:
- continue
-
- if self._schedule_tasks():
- state_change += 1
-
- if not wait and self._jobs < max_jobs:
- break
-
- if not state_change:
- raise AssertionError("tight loop")
-
def _schedule_tasks(self):
state_change = 0
for x in self._task_queues.values():
@@ -8524,45 +8677,6 @@ class Scheduler(object):
return True
return False
- def _register(self, f, eventmask, handler):
- """
- @rtype: Integer
- @return: A unique registration id, for use in schedule() or
- unregister() calls.
- """
- self._event_handler_id += 1
- reg_id = self._event_handler_id
- self._poll_event_handler_ids[reg_id] = f
- self._poll_event_handlers[f] = (handler, reg_id)
- self._poll.register(f, eventmask)
- return reg_id
-
- def _unregister(self, reg_id):
- f = self._poll_event_handler_ids[reg_id]
- self._poll.unregister(f)
- del self._poll_event_handlers[f]
- del self._poll_event_handler_ids[reg_id]
- self._schedule_tasks()
-
- def _schedule(self, wait_id):
- """
- Schedule until wait_id is not longer registered
- for poll() events.
- @type wait_id: int
- @param wait_id: a task id to wait for
- """
- event_handlers = self._poll_event_handlers
- handler_ids = self._poll_event_handler_ids
- poll = self._poll.poll
-
- self._schedule_tasks()
-
- while wait_id in handler_ids:
- for f, event in poll():
- handler, reg_id = event_handlers[f]
- if not handler(f, event):
- self._unregister(reg_id)
-
def _world_atom(self, pkg):
"""
Add the package to the world file, but only if
@@ -8630,6 +8744,111 @@ class Scheduler(object):
return pkg
+class MetadataRegen(PollLoop):
+
+ class _sched_iface_class(SlotObject):
+ __slots__ = ("register", "schedule")
+
+ def __init__(self, portdb, max_jobs=None, max_load=None):
+ PollLoop.__init__(self)
+ self._portdb = portdb
+
+ if max_jobs is None:
+ max_jobs = 1
+
+ self._job_queue = SequentialTaskQueue(max_jobs=max_jobs)
+ self._max_jobs = max_jobs
+ self._max_load = max_load
+ self._sched_iface = self._sched_iface_class(
+ register=self._register,
+ schedule=self._schedule)
+
+ self._valid_pkgs = set()
+
+ def _iter_metadata_processes(self):
+ portdb = self._portdb
+ valid_pkgs = self._valid_pkgs
+ every_cp = portdb.cp_all()
+ every_cp.sort(reverse=True)
+
+ while every_cp:
+ cp = every_cp.pop()
+ portage.writemsg_stdout("Processing %s\n" % cp)
+ cpv_list = portdb.cp_list(cp)
+ for cpv in cpv_list:
+ valid_pkgs.add(cpv)
+ ebuild_path, repo_path = portdb.findname2(cpv)
+ metadata_process = portdb._metadata_process(
+ cpv, ebuild_path, repo_path)
+ if metadata_process is None:
+ continue
+ yield metadata_process
+
+ def run(self):
+
+ portdb = self._portdb
+ from portage.cache.cache_errors import CacheError
+ dead_nodes = {}
+
+ for mytree in portdb.porttrees:
+ try:
+ dead_nodes[mytree] = set(portdb.auxdb[mytree].iterkeys())
+ except CacheError, e:
+ portage.writemsg("Error listing cache entries for " + \
+ "'%s': %s, continuing...\n" % (mytree, e), noiselevel=-1)
+ del e
+ dead_nodes = None
+ break
+
+ self._main_loop()
+
+ if dead_nodes:
+ for y in self._valid_pkgs:
+ for mytree in portdb.porttrees:
+ if portdb.findname2(y, mytree=mytree)[0]:
+ dead_nodes[mytree].discard(y)
+
+ for mytree, nodes in dead_nodes.iteritems():
+ auxdb = portdb.auxdb[mytree]
+ for y in nodes:
+ try:
+ del auxdb[y]
+ except (KeyError, CacheError):
+ pass
+
+ def _main_loop(self):
+
+ process_iter = self._iter_metadata_processes()
+
+ while True:
+
+ if not self._can_add_job():
+ self._schedule_main()
+ continue
+
+ try:
+ metadata_process = process_iter.next()
+ except StopIteration:
+ break
+
+ self._jobs += 1
+ metadata_process.scheduler = self._sched_iface
+ metadata_process.addExitListener(self._metadata_exit)
+ self._job_queue.add(metadata_process)
+
+ while self._jobs:
+ self._schedule_main(wait=True)
+
+ def _schedule_tasks(self):
+ return self._job_queue.schedule()
+
+ def _metadata_exit(self, metadata_process):
+ self._jobs -= 1
+ if metadata_process.returncode != os.EX_OK:
+ self._valid_pkgs.discard(metadata_process.cpv)
+ portage.writemsg("Error processing %s, continuing...\n" % \
+ (metadata_process.cpv,))
+
class UninstallFailure(portage.exception.PortageException):
"""
An instance of this class is raised by unmerge() when
@@ -9934,7 +10153,7 @@ def action_metadata(settings, portdb, myopts):
sys.stdout.flush()
os.umask(old_umask)
-def action_regen(settings, portdb):
+def action_regen(settings, portdb, max_jobs, max_load):
xterm_titles = "notitles" not in settings.features
emergelog(xterm_titles, " === regen")
#regenerate cache entries
@@ -9946,40 +10165,10 @@ def action_regen(settings, portdb):
except:
pass
sys.stdout.flush()
- mynodes = portdb.cp_all()
- from portage.cache.cache_errors import CacheError
- dead_nodes = {}
- for mytree in portdb.porttrees:
- try:
- dead_nodes[mytree] = set(portdb.auxdb[mytree].iterkeys())
- except CacheError, e:
- portage.writemsg("Error listing cache entries for " + \
- "'%s': %s, continuing...\n" % (mytree, e), noiselevel=-1)
- del e
- dead_nodes = None
- break
- for x in mynodes:
- mymatches = portdb.cp_list(x)
- portage.writemsg_stdout("Processing %s\n" % x)
- for y in mymatches:
- try:
- foo = portdb.aux_get(y,["DEPEND"])
- except (KeyError, portage.exception.PortageException), e:
- portage.writemsg(
- "Error processing %(cpv)s, continuing... (%(e)s)\n" % \
- {"cpv":y,"e":str(e)}, noiselevel=-1)
- if dead_nodes:
- for mytree in portdb.porttrees:
- if portdb.findname2(y, mytree=mytree)[0]:
- dead_nodes[mytree].discard(y)
- if dead_nodes:
- for mytree, nodes in dead_nodes.iteritems():
- auxdb = portdb.auxdb[mytree]
- for y in nodes:
- try:
- del auxdb[y]
- except (KeyError, CacheError):
- pass
+
+ regen = MetadataRegen(portdb, max_jobs=max_jobs, max_load=max_load)
+ regen.run()
+
portage.writemsg_stdout("done!\n")
def action_config(settings, trees, myopts, myfiles):
@@ -11408,6 +11597,7 @@ def adjust_config(myopts, settings):
def emerge_main():
global portage # NFC why this is necessary now - genone
+ portage._disable_legacy_globals()
# Disable color until we're sure that it should be enabled (after
# EMERGE_DEFAULT_OPTS has been parsed).
portage.output.havecolor = 0
@@ -11789,7 +11979,8 @@ def emerge_main():
action_metadata(settings, portdb, myopts)
elif myaction=="regen":
validate_ebuild_environment(trees)
- action_regen(settings, portdb)
+ action_regen(settings, portdb, myopts.get("--jobs"),
+ myopts.get("--load-average"))
# HELP action
elif "config"==myaction:
validate_ebuild_environment(trees)