From 5a7690fd10a6935e756495590c9c2b6a19aa8139 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Thu, 18 Oct 2012 18:15:00 -0700 Subject: SpawnProcess: split out a PipeLogger class The copyright dates for these classes begin in 2008, since SpawnProcess code is derived from the EbuildFetcherAsync class which was added in commit e4edadf5ae7063f375d76be151c6d0e949980ecf in 2008. --- pym/_emerge/SpawnProcess.py | 165 +++++++--------------------------- pym/portage/util/_async/PipeLogger.py | 149 ++++++++++++++++++++++++++++++ 2 files changed, 183 insertions(+), 131 deletions(-) create mode 100644 pym/portage/util/_async/PipeLogger.py (limited to 'pym') diff --git a/pym/_emerge/SpawnProcess.py b/pym/_emerge/SpawnProcess.py index ab152c3c3..d18512b34 100644 --- a/pym/_emerge/SpawnProcess.py +++ b/pym/_emerge/SpawnProcess.py @@ -1,17 +1,12 @@ -# Copyright 1999-2012 Gentoo Foundation +# Copyright 2008-2012 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 from _emerge.SubProcess import SubProcess import sys -from portage.cache.mappings import slot_dict_class import portage -from portage import _encodings -from portage import _unicode_encode from portage import os from portage.const import BASH_BINARY -import fcntl -import errno -import gzip +from portage.util._async.PipeLogger import PipeLogger class SpawnProcess(SubProcess): @@ -26,10 +21,7 @@ class SpawnProcess(SubProcess): "path_lookup", "pre_exec") __slots__ = ("args",) + \ - _spawn_kwarg_names + ("_log_file_real", "_selinux_type",) - - _file_names = ("log", "process", "stdout") - _files_dict = slot_dict_class(_file_names, prefix="") + _spawn_kwarg_names + ("_pipe_logger", "_selinux_type",) def _start(self): @@ -37,17 +29,13 @@ class SpawnProcess(SubProcess): self.fd_pipes = {} fd_pipes = self.fd_pipes - self._files = self._files_dict() - files = self._files - master_fd, slave_fd = self._pipe(fd_pipes) - fcntl.fcntl(master_fd, fcntl.F_SETFL, - fcntl.fcntl(master_fd, fcntl.F_GETFL) | os.O_NONBLOCK) - files.process = master_fd - logfile = None - if self._can_log(slave_fd): - logfile = self.logfile + can_log = self._can_log(slave_fd) + if can_log: + log_file_path = self.logfile + else: + log_file_path = None null_input = None if not self.background or 0 in fd_pipes: @@ -74,37 +62,19 @@ class SpawnProcess(SubProcess): sys.__stderr__.flush() break - if logfile is not None: + fd_pipes_orig = fd_pipes.copy() - fd_pipes_orig = fd_pipes.copy() + if log_file_path is not None: fd_pipes[1] = slave_fd fd_pipes[2] = slave_fd - files.log = open(_unicode_encode(logfile, - encoding=_encodings['fs'], errors='strict'), mode='ab') - if logfile.endswith('.gz'): - self._log_file_real = files.log - files.log = gzip.GzipFile(filename='', mode='ab', - fileobj=files.log) - - portage.util.apply_secpass_permissions(logfile, - uid=portage.portage_uid, gid=portage.portage_gid, - mode=0o660) - - if not self.background: - files.stdout = os.dup(fd_pipes_orig[1]) - - output_handler = self._output_handler - else: - # Create a dummy pipe so the scheduler can monitor # the process from inside a poll() loop. fd_pipes[self._dummy_pipe_fd] = slave_fd if self.background: fd_pipes[1] = slave_fd fd_pipes[2] = slave_fd - output_handler = self._dummy_handler kwargs = {} for k in self._spawn_kwarg_names: @@ -116,10 +86,6 @@ class SpawnProcess(SubProcess): kwargs["returnpid"] = True kwargs.pop("logfile", None) - self._reg_id = self.scheduler.io_add_watch(files.process, - self._registered_events, output_handler) - self._registered = True - retval = self._spawn(self.args, **kwargs) os.close(slave_fd) @@ -136,6 +102,18 @@ class SpawnProcess(SubProcess): self.pid = retval[0] portage.process.spawned_pids.remove(self.pid) + stdout_fd = None + if can_log and not self.background: + stdout_fd = os.dup(fd_pipes_orig[1]) + + self._pipe_logger = PipeLogger(background=self.background, + scheduler=self.scheduler, input_fd=master_fd, + log_file_path=log_file_path, + stdout_fd=stdout_fd) + self._pipe_logger.addExitListener(self._pipe_logger_exit) + self._pipe_logger.start() + self._registered = True + def _can_log(self, slave_fd): return True @@ -158,92 +136,17 @@ class SpawnProcess(SubProcess): return spawn_func(args, **kwargs) - def _output_handler(self, fd, event): - - files = self._files - while True: - buf = self._read_buf(fd, event) - - if buf is None: - # not a POLLIN event, EAGAIN, etc... - break + def _pipe_logger_exit(self, pipe_logger): + self._pipe_logger = None + self._unregister() + self.wait() - if not buf: - # EOF - self._unregister() - self.wait() - break - - else: - if not self.background: - write_successful = False - failures = 0 - while True: - try: - if not write_successful: - os.write(files.stdout, buf) - write_successful = True - break - except OSError as e: - if e.errno != errno.EAGAIN: - raise - del e - failures += 1 - if failures > 50: - # Avoid a potentially infinite loop. In - # most cases, the failure count is zero - # and it's unlikely to exceed 1. - raise - - # This means that a subprocess has put an inherited - # stdio file descriptor (typically stdin) into - # O_NONBLOCK mode. This is not acceptable (see bug - # #264435), so revert it. We need to use a loop - # here since there's a race condition due to - # parallel processes being able to change the - # flags on the inherited file descriptor. - # TODO: When possible, avoid having child processes - # inherit stdio file descriptors from portage - # (maybe it can't be avoided with - # PROPERTIES=interactive). - fcntl.fcntl(files.stdout, fcntl.F_SETFL, - fcntl.fcntl(files.stdout, - fcntl.F_GETFL) ^ os.O_NONBLOCK) - - files.log.write(buf) - files.log.flush() - - self._unregister_if_appropriate(event) - - return True - - def _dummy_handler(self, fd, event): - """ - This method is mainly interested in detecting EOF, since - the only purpose of the pipe is to allow the scheduler to - monitor the process from inside a poll() loop. - """ - - while True: - buf = self._read_buf(fd, event) - - if buf is None: - # not a POLLIN event, EAGAIN, etc... - break - - if not buf: - # EOF - self._unregister() - self.wait() - break - - self._unregister_if_appropriate(event) - - return True + def _waitpid_loop(self): + SubProcess._waitpid_loop(self) - def _unregister(self): - super(SpawnProcess, self)._unregister() - if self._log_file_real is not None: - # Avoid "ResourceWarning: unclosed file" since python 3.2. - self._log_file_real.close() - self._log_file_real = None + pipe_logger = self._pipe_logger + if pipe_logger is not None: + self._pipe_logger = None + pipe_logger.removeExitListener(self._pipe_logger_exit) + pipe_logger.cancel() + pipe_logger.wait() diff --git a/pym/portage/util/_async/PipeLogger.py b/pym/portage/util/_async/PipeLogger.py new file mode 100644 index 000000000..dbdd56f2a --- /dev/null +++ b/pym/portage/util/_async/PipeLogger.py @@ -0,0 +1,149 @@ +# Copyright 2008-2012 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import fcntl +import errno +import gzip + +import portage +from portage import os, _encodings, _unicode_encode +from _emerge.AbstractPollTask import AbstractPollTask + +class PipeLogger(AbstractPollTask): + + """ + This can be used for logging output of a child process, + optionally outputing to log_file_path and/or stdout_fd. It can + also monitor for EOF on input_fd, which may be used to detect + termination of a child process. If log_file_path ends with + '.gz' then the log file is written with compression. + """ + + __slots__ = ("input_fd", "log_file_path", "stdout_fd") + \ + ("_log_file", "_log_file_real", "_reg_id") + + def _start(self): + + log_file_path = self.log_file_path + if log_file_path is not None: + + self._log_file = open(_unicode_encode(log_file_path, + encoding=_encodings['fs'], errors='strict'), mode='ab') + if log_file_path.endswith('.gz'): + self._log_file_real = self._log_file + self._log_file = gzip.GzipFile(filename='', mode='ab', + fileobj=self._log_file) + + portage.util.apply_secpass_permissions(log_file_path, + uid=portage.portage_uid, gid=portage.portage_gid, + mode=0o660) + + fcntl.fcntl(self.input_fd, fcntl.F_SETFL, + fcntl.fcntl(self.input_fd, fcntl.F_GETFL) | os.O_NONBLOCK) + + self._reg_id = self.scheduler.io_add_watch(self.input_fd, + self._registered_events, self._output_handler) + self._registered = True + + def isAlive(self): + return self._registered + + def _cancel(self): + self._unregister() + if self.returncode is None: + self.returncode = self._cancelled_returncode + + def _wait(self): + if self.returncode is not None: + return self.returncode + self._wait_loop() + self.returncode = os.EX_OK + return self.returncode + + def _output_handler(self, fd, event): + + background = self.background + stdout_fd = self.stdout_fd + log_file = self._log_file + + while True: + buf = self._read_buf(fd, event) + + if buf is None: + # not a POLLIN event, EAGAIN, etc... + break + + if not buf: + # EOF + self._unregister() + self.wait() + break + + else: + if not background and stdout_fd is not None: + write_successful = False + failures = 0 + while True: + try: + if not write_successful: + os.write(stdout_fd, buf) + write_successful = True + break + except OSError as e: + if e.errno != errno.EAGAIN: + raise + del e + failures += 1 + if failures > 50: + # Avoid a potentially infinite loop. In + # most cases, the failure count is zero + # and it's unlikely to exceed 1. + raise + + # This means that a subprocess has put an inherited + # stdio file descriptor (typically stdin) into + # O_NONBLOCK mode. This is not acceptable (see bug + # #264435), so revert it. We need to use a loop + # here since there's a race condition due to + # parallel processes being able to change the + # flags on the inherited file descriptor. + # TODO: When possible, avoid having child processes + # inherit stdio file descriptors from portage + # (maybe it can't be avoided with + # PROPERTIES=interactive). + fcntl.fcntl(stdout_fd, fcntl.F_SETFL, + fcntl.fcntl(stdout_fd, + fcntl.F_GETFL) ^ os.O_NONBLOCK) + + if log_file is not None: + log_file.write(buf) + log_file.flush() + + self._unregister_if_appropriate(event) + + return True + + def _unregister(self): + + if self._reg_id is not None: + self.scheduler.source_remove(self._reg_id) + self._reg_id = None + + if self.input_fd is not None: + os.close(self.input_fd) + self.input_fd = None + + if self.stdout_fd is not None: + os.close(self.stdout_fd) + self.stdout_fd = None + + if self._log_file is not None: + self._log_file.close() + self._log_file = None + + if self._log_file_real is not None: + # Avoid "ResourceWarning: unclosed file" since python 3.2. + self._log_file_real.close() + self._log_file_real = None + + self._registered = False -- cgit v1.2.3-1-g7c22