diff options
author | Zac Medico <zmedico@gentoo.org> | 2012-10-18 18:15:00 -0700 |
---|---|---|
committer | Zac Medico <zmedico@gentoo.org> | 2012-10-18 18:15:00 -0700 |
commit | 5a7690fd10a6935e756495590c9c2b6a19aa8139 (patch) | |
tree | 939b8bd26561d2d70057efbdc8f9c8b61e2203f2 /pym | |
parent | c34b00b7b402b199cc6b7db1a7d1d4547ef3b413 (diff) | |
download | portage-5a7690fd10a6935e756495590c9c2b6a19aa8139.tar.gz portage-5a7690fd10a6935e756495590c9c2b6a19aa8139.tar.bz2 portage-5a7690fd10a6935e756495590c9c2b6a19aa8139.zip |
SpawnProcess: split out a PipeLogger class
The copyright dates for these classes begin in 2008, since SpawnProcess
code is derived from the EbuildFetcherAsync class which was added in
commit e4edadf5ae7063f375d76be151c6d0e949980ecf in 2008.
Diffstat (limited to 'pym')
-rw-r--r-- | pym/_emerge/SpawnProcess.py | 165 | ||||
-rw-r--r-- | pym/portage/util/_async/PipeLogger.py | 149 |
2 files changed, 183 insertions, 131 deletions
diff --git a/pym/_emerge/SpawnProcess.py b/pym/_emerge/SpawnProcess.py index ab152c3c3..d18512b34 100644 --- a/pym/_emerge/SpawnProcess.py +++ b/pym/_emerge/SpawnProcess.py @@ -1,17 +1,12 @@ -# Copyright 1999-2012 Gentoo Foundation +# Copyright 2008-2012 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 from _emerge.SubProcess import SubProcess import sys -from portage.cache.mappings import slot_dict_class import portage -from portage import _encodings -from portage import _unicode_encode from portage import os from portage.const import BASH_BINARY -import fcntl -import errno -import gzip +from portage.util._async.PipeLogger import PipeLogger class SpawnProcess(SubProcess): @@ -26,10 +21,7 @@ class SpawnProcess(SubProcess): "path_lookup", "pre_exec") __slots__ = ("args",) + \ - _spawn_kwarg_names + ("_log_file_real", "_selinux_type",) - - _file_names = ("log", "process", "stdout") - _files_dict = slot_dict_class(_file_names, prefix="") + _spawn_kwarg_names + ("_pipe_logger", "_selinux_type",) def _start(self): @@ -37,17 +29,13 @@ class SpawnProcess(SubProcess): self.fd_pipes = {} fd_pipes = self.fd_pipes - self._files = self._files_dict() - files = self._files - master_fd, slave_fd = self._pipe(fd_pipes) - fcntl.fcntl(master_fd, fcntl.F_SETFL, - fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK) - files.process = master_fd - logfile = None - if self._can_log(slave_fd): - logfile = self.logfile + can_log = self._can_log(slave_fd) + if can_log: + log_file_path = self.logfile + else: + log_file_path = None null_input = None if not self.background or 0 in fd_pipes: @@ -74,37 +62,19 @@ class SpawnProcess(SubProcess): sys.__stderr__.flush() break - if logfile is not None: + fd_pipes_orig = fd_pipes.copy() - fd_pipes_orig = fd_pipes.copy() + if log_file_path is not None: fd_pipes[1] = slave_fd fd_pipes[2] = slave_fd - files.log = open(_unicode_encode(logfile, - encoding=_encodings['fs'], errors='strict'), mode='ab') - if logfile.endswith('.gz'): - self._log_file_real = files.log - files.log = gzip.GzipFile(filename='', mode='ab', - fileobj=files.log) - - portage.util.apply_secpass_permissions(logfile, - uid=portage.portage_uid, gid=portage.portage_gid, - mode=0o660) - - if not self.background: - files.stdout = os.dup(fd_pipes_orig[1]) - - output_handler = self._output_handler - else: - # Create a dummy pipe so the scheduler can monitor # the process from inside a poll() loop. fd_pipes[self._dummy_pipe_fd] = slave_fd if self.background: fd_pipes[1] = slave_fd fd_pipes[2] = slave_fd - output_handler = self._dummy_handler kwargs = {} for k in self._spawn_kwarg_names: @@ -116,10 +86,6 @@ class SpawnProcess(SubProcess): kwargs["returnpid"] = True kwargs.pop("logfile", None) - self._reg_id = self.scheduler.io_add_watch(files.process, - self._registered_events, output_handler) - self._registered = True - retval = self._spawn(self.args, **kwargs) os.close(slave_fd) @@ -136,6 +102,18 @@ class SpawnProcess(SubProcess): self.pid = retval[0] portage.process.spawned_pids.remove(self.pid) + stdout_fd = None + if can_log and not self.background: + stdout_fd = os.dup(fd_pipes_orig[1]) + + self._pipe_logger = PipeLogger(background=self.background, + scheduler=self.scheduler, input_fd=master_fd, + log_file_path=log_file_path, + stdout_fd=stdout_fd) + self._pipe_logger.addExitListener(self._pipe_logger_exit) + self._pipe_logger.start() + self._registered = True + def _can_log(self, slave_fd): return True @@ -158,92 +136,17 @@ class SpawnProcess(SubProcess): return spawn_func(args, **kwargs) - def _output_handler(self, fd, event): - - files = self._files - while True: - buf = self._read_buf(fd, event) - - if buf is None: - # not a POLLIN event, EAGAIN, etc... - break + def _pipe_logger_exit(self, pipe_logger): + self._pipe_logger = None + self._unregister() + self.wait() - if not buf: - # EOF - self._unregister() - self.wait() - break - - else: - if not self.background: - write_successful = False - failures = 0 - while True: - try: - if not write_successful: - os.write(files.stdout, buf) - write_successful = True - break - except OSError as e: - if e.errno != errno.EAGAIN: - raise - del e - failures += 1 - if failures > 50: - # Avoid a potentially infinite loop. In - # most cases, the failure count is zero - # and it's unlikely to exceed 1. - raise - - # This means that a subprocess has put an inherited - # stdio file descriptor (typically stdin) into - # O_NONBLOCK mode. This is not acceptable (see bug - # #264435), so revert it. We need to use a loop - # here since there's a race condition due to - # parallel processes being able to change the - # flags on the inherited file descriptor. - # TODO: When possible, avoid having child processes - # inherit stdio file descriptors from portage - # (maybe it can't be avoided with - # PROPERTIES=interactive). - fcntl.fcntl(files.stdout, fcntl.F_SETFL, - fcntl.fcntl(files.stdout, - fcntl.F_GETFL) ^ os.O_NONBLOCK) - - files.log.write(buf) - files.log.flush() - - self._unregister_if_appropriate(event) - - return True - - def _dummy_handler(self, fd, event): - """ - This method is mainly interested in detecting EOF, since - the only purpose of the pipe is to allow the scheduler to - monitor the process from inside a poll() loop. - """ - - while True: - buf = self._read_buf(fd, event) - - if buf is None: - # not a POLLIN event, EAGAIN, etc... - break - - if not buf: - # EOF - self._unregister() - self.wait() - break - - self._unregister_if_appropriate(event) - - return True + def _waitpid_loop(self): + SubProcess._waitpid_loop(self) - def _unregister(self): - super(SpawnProcess, self)._unregister() - if self._log_file_real is not None: - # Avoid "ResourceWarning: unclosed file" since python 3.2. - self._log_file_real.close() - self._log_file_real = None + pipe_logger = self._pipe_logger + if pipe_logger is not None: + self._pipe_logger = None + pipe_logger.removeExitListener(self._pipe_logger_exit) + pipe_logger.cancel() + pipe_logger.wait() diff --git a/pym/portage/util/_async/PipeLogger.py b/pym/portage/util/_async/PipeLogger.py new file mode 100644 index 000000000..dbdd56f2a --- /dev/null +++ b/pym/portage/util/_async/PipeLogger.py @@ -0,0 +1,149 @@ +# Copyright 2008-2012 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import fcntl +import errno +import gzip + +import portage +from portage import os, _encodings, _unicode_encode +from _emerge.AbstractPollTask import AbstractPollTask + +class PipeLogger(AbstractPollTask): + + """ + This can be used for logging output of a child process, + optionally outputing to log_file_path and/or stdout_fd. It can + also monitor for EOF on input_fd, which may be used to detect + termination of a child process. If log_file_path ends with + '.gz' then the log file is written with compression. + """ + + __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \ + ("_log_file", "_log_file_real", "_reg_id") + + def _start(self): + + log_file_path = self.log_file_path + if log_file_path is not None: + + self._log_file = open(_unicode_encode(log_file_path, + encoding=_encodings['fs'], errors='strict'), mode='ab') + if log_file_path.endswith('.gz'): + self._log_file_real = self._log_file + self._log_file = gzip.GzipFile(filename='', mode='ab', + fileobj=self._log_file) + + portage.util.apply_secpass_permissions(log_file_path, + uid=portage.portage_uid, gid=portage.portage_gid, + mode=0o660) + + fcntl.fcntl(self.input_fd, fcntl.F_SETFL, + fcntl.fcntl(self.input_fd, fcntl.F_GETFL) | os.O_NONBLOCK) + + self._reg_id = self.scheduler.io_add_watch(self.input_fd, + self._registered_events, self._output_handler) + self._registered = True + + def isAlive(self): + return self._registered + + def _cancel(self): + self._unregister() + if self.returncode is None: + self.returncode = self._cancelled_returncode + + def _wait(self): + if self.returncode is not None: + return self.returncode + self._wait_loop() + self.returncode = os.EX_OK + return self.returncode + + def _output_handler(self, fd, event): + + background = self.background + stdout_fd = self.stdout_fd + log_file = self._log_file + + while True: + buf = self._read_buf(fd, event) + + if buf is None: + # not a POLLIN event, EAGAIN, etc... + break + + if not buf: + # EOF + self._unregister() + self.wait() + break + + else: + if not background and stdout_fd is not None: + write_successful = False + failures = 0 + while True: + try: + if not write_successful: + os.write(stdout_fd, buf) + write_successful = True + break + except OSError as e: + if e.errno != errno.EAGAIN: + raise + del e + failures += 1 + if failures > 50: + # Avoid a potentially infinite loop. In + # most cases, the failure count is zero + # and it's unlikely to exceed 1. + raise + + # This means that a subprocess has put an inherited + # stdio file descriptor (typically stdin) into + # O_NONBLOCK mode. This is not acceptable (see bug + # #264435), so revert it. We need to use a loop + # here since there's a race condition due to + # parallel processes being able to change the + # flags on the inherited file descriptor. + # TODO: When possible, avoid having child processes + # inherit stdio file descriptors from portage + # (maybe it can't be avoided with + # PROPERTIES=interactive). + fcntl.fcntl(stdout_fd, fcntl.F_SETFL, + fcntl.fcntl(stdout_fd, + fcntl.F_GETFL) ^ os.O_NONBLOCK) + + if log_file is not None: + log_file.write(buf) + log_file.flush() + + self._unregister_if_appropriate(event) + + return True + + def _unregister(self): + + if self._reg_id is not None: + self.scheduler.source_remove(self._reg_id) + self._reg_id = None + + if self.input_fd is not None: + os.close(self.input_fd) + self.input_fd = None + + if self.stdout_fd is not None: + os.close(self.stdout_fd) + self.stdout_fd = None + + if self._log_file is not None: + self._log_file.close() + self._log_file = None + + if self._log_file_real is not None: + # Avoid "ResourceWarning: unclosed file" since python 3.2. + self._log_file_real.close() + self._log_file_real = None + + self._registered = False |