From bfb69d2ca63d5c8aecf2c7be008d4971ea57e347 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Thu, 3 Jul 2008 03:39:06 +0000 Subject: Split out a SpawnProcess base class out of EbuildFetcherAsync. svn path=/main/trunk/; revision=10898 --- pym/_emerge/__init__.py | 112 ++++++++++++++++++++++++++++++++---------------- 1 file changed, 74 insertions(+), 38 deletions(-) diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index 66d06b370..e46a8a124 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -1495,13 +1495,22 @@ class SubProcess(SlotObject): self.returncode = retval -class EbuildFetcherAsync(SubProcess): +class SpawnProcess(SubProcess): - __slots__ = ("log_file", "fd_pipes", "pkg", - "register", "unregister", - "files") + """ + Constructor keyword args are passed into portage.process.spawn(). + The required "args" keyword argument will be passed as the first + spawn() argument. + """ - _file_names = ("fetcher", "out") + _spawn_kwarg_names = ("env", "opt_name", "fd_pipes", + "uid", "gid", "groups", "umask", "logfile", + "path_lookup", "pre_exec") + + __slots__ = ("args", "files", "register", "unregister", "registered") + \ + _spawn_kwarg_names + + _file_names = ("process", "out") _files_dict = slot_dict_class(_file_names, prefix="") _bufsize = 4096 @@ -1519,27 +1528,26 @@ class EbuildFetcherAsync(SubProcess): 2 : sys.stderr.fileno(), } - log_file = self.log_file + logfile = self.logfile self.files = self._files_dict() files = self.files - if log_file is not None: - files.out = open(log_file, "a") - portage.util.apply_secpass_permissions(log_file, + if logfile is not None: + files.out = open(logfile, "a") + portage.util.apply_secpass_permissions(logfile, uid=portage.portage_uid, gid=portage.portage_gid, mode=0660) else: + fd_pipes.setdefault(1, sys.stdout.fileno()) for fd in fd_pipes.itervalues(): if fd == sys.stdout.fileno(): sys.stdout.flush() if fd == sys.stderr.fileno(): sys.stderr.flush() - files.out = os.fdopen(os.dup(fd_pipes[1]), 'w') master_fd, slave_fd = os.pipe() - import fcntl fcntl.fcntl(master_fd, fcntl.F_SETFL, fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK) @@ -1549,48 +1557,70 @@ class EbuildFetcherAsync(SubProcess): fd_pipes[1] = slave_fd fd_pipes[2] = slave_fd - root_config = self.pkg.root_config - portdb = root_config.trees["porttree"].dbapi - ebuild_path = portdb.findname(self.pkg.cpv) - settings = root_config.settings + kwargs = {} + for k in self._spawn_kwarg_names: + v = getattr(self, k) + if v is not None: + kwargs[k] = v - fetch_env = dict((k, settings[k]) for k in settings) - fetch_env["FEATURES"] = fetch_env.get("FEATURES", "") + " -cvs" - fetch_env["PORTAGE_NICENESS"] = "0" - fetch_env["PORTAGE_PARALLEL_FETCHONLY"] = "1" - - ebuild_binary = os.path.join( - settings["EBUILD_BIN_PATH"], "ebuild") - - fetch_args = [ebuild_binary, ebuild_path, "fetch"] - debug = settings.get("PORTAGE_DEBUG") == "1" - if debug: - fetch_args.append("--debug") + kwargs["fd_pipes"] = fd_pipes + kwargs["returnpid"] = True + del kwargs["logfile"] - retval = portage.process.spawn(fetch_args, env=fetch_env, - fd_pipes=fd_pipes, returnpid=True) + retval = portage.process.spawn(self.args, **kwargs) self.pid = retval[0] os.close(slave_fd) - files.fetcher = os.fdopen(master_fd, 'r') - self.register(files.fetcher.fileno(), + files.process = os.fdopen(master_fd, 'r') + self.registered = True + self.register(files.process.fileno(), select.POLLIN, self._output_handler) def _output_handler(self, fd, event): files = self.files buf = array.array('B') try: - buf.fromfile(files.fetcher, self._bufsize) + buf.fromfile(files.process, self._bufsize) except EOFError: pass if buf: buf.tofile(files.out) files.out.flush() else: - self.unregister(files.fetcher.fileno()) + fd = files.process.fileno() for f in files.values(): + f.flush() f.close() + self.registered = False + self.unregister(fd) + +class EbuildFetcherAsync(SpawnProcess): + + __slots__ = ("pkg",) + + def start(self): + + root_config = self.pkg.root_config + portdb = root_config.trees["porttree"].dbapi + ebuild_path = portdb.findname(self.pkg.cpv) + settings = root_config.settings + + fetch_env = settings.environ() + fetch_env["PORTAGE_NICENESS"] = "0" + fetch_env["PORTAGE_PARALLEL_FETCHONLY"] = "1" + + ebuild_binary = os.path.join( + settings["EBUILD_BIN_PATH"], "ebuild") + + fetch_args = [ebuild_binary, ebuild_path, "fetch"] + debug = settings.get("PORTAGE_DEBUG") == "1" + if debug: + fetch_args.append("--debug") + + self.args = fetch_args + self.env = fetch_env + SpawnProcess.start(self) class EbuildBuildDir(SlotObject): @@ -1848,7 +1878,7 @@ class EbuildPhase(SubProcess): __slots__ = ("fd_pipes", "phase", "pkg", "register", "settings", "unregister", - "files") + "files", "registered") _file_names = ("log", "stdout", "ebuild") _files_dict = slot_dict_class(_file_names, prefix="") @@ -1929,6 +1959,7 @@ class EbuildPhase(SubProcess): files.log = open(logfile, 'a') files.stdout = os.fdopen(os.dup(fd_pipes_orig[1]), 'w') files.ebuild = os.fdopen(master_fd, 'r') + self.registered = True self.register(files.ebuild.fileno(), select.POLLIN, self._output_handler) @@ -1945,9 +1976,11 @@ class EbuildPhase(SubProcess): buf.tofile(files.log) files.log.flush() else: - self.unregister(files.ebuild.fileno()) + fd = files.ebuild.fileno() for f in files.values(): f.close() + self.registered = False + self.unregister(fd) def _set_returncode(self, wait_retval): SubProcess._set_returncode(self, wait_retval) @@ -2309,6 +2342,7 @@ class BinpkgFetcherAsync(SubProcess): os.close(slave_fd) files.fetcher = os.fdopen(master_fd, 'r') + self.registered = True self.register(files.fetcher.fileno(), select.POLLIN, self._output_handler) @@ -2323,11 +2357,13 @@ class BinpkgFetcherAsync(SubProcess): buf.tofile(files.out) files.out.flush() else: - self.unregister(files.fetcher.fileno()) + fd = files.fetcher.fileno() for f in files.values(): f.close() if self.locked: self.unlock() + self.registered = False + self.unregister(fd) def lock(self): """ @@ -7146,7 +7182,7 @@ class Scheduler(object): state_changed = False for task in list(running_tasks): - if task.poll() is not None: + if not task.registered and task.poll() is not None: running_tasks.remove(task) state_changed = True @@ -7196,7 +7232,7 @@ class Scheduler(object): continue if pkg.type_name == "ebuild": self._add_task(EbuildFetcherAsync( - log_file=self._fetch_log, + logfile=self._fetch_log, pkg=pkg, register=self._register, unregister=self._unregister)) elif pkg.type_name == "binary" and getbinpkg and \ -- cgit v1.2.3-1-g7c22