From e489dcef3fd91ecf4a0960bdd7ed1de0a992f72b Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Fri, 4 Jan 2013 05:22:48 -0800 Subject: ebuild-ipc: add FifoWriter class --- bin/ebuild-ipc.py | 79 +++++++++++++++---------------------------------------- 1 file changed, 21 insertions(+), 58 deletions(-) diff --git a/bin/ebuild-ipc.py b/bin/ebuild-ipc.py index 4046d8d25..d351e9454 100755 --- a/bin/ebuild-ipc.py +++ b/bin/ebuild-ipc.py @@ -12,7 +12,6 @@ import platform import signal import sys import time -import traceback def debug_signal(signum, frame): import pdb @@ -39,9 +38,20 @@ import portage portage._internal_caller = True portage._disable_legacy_globals() +from portage.util._async.ForkProcess import ForkProcess from portage.util._eventloop.global_event_loop import global_event_loop from _emerge.PipeReader import PipeReader +class FifoWriter(ForkProcess): + + __slots__ = ('buf', 'fifo',) + + def _run(self): + # Atomically write the whole buffer into the fifo. + with open(self.fifo, 'wb', 0) as f: + f.write(self.buf) + return os.EX_OK + class EbuildIpc(object): # Timeout for each individual communication attempt (we retry @@ -90,7 +100,7 @@ class EbuildIpc(object): 'ebuild-ipc: daemon process not detected\n'), level=logging.ERROR, noiselevel=-1) - def _wait(self, pid, pr, msg): + def _run_writer(self, fifo_writer, msg): """ Wait on pid and return an appropriate exit code. This may return unsuccessfully due to timeout if the daemon @@ -99,49 +109,24 @@ class EbuildIpc(object): start_time = time.time() - pipe_reader = PipeReader(input_files={"pipe_read":pr}, - scheduler=global_event_loop()) - pipe_reader.start() - - eof = pipe_reader.poll() is not None + fifo_writer.start() + eof = fifo_writer.poll() is not None while not eof: - pipe_reader._wait_loop(timeout=self._COMMUNICATE_RETRY_TIMEOUT_MS) + fifo_writer._wait_loop(timeout=self._COMMUNICATE_RETRY_TIMEOUT_MS) - eof = pipe_reader.poll() is not None + eof = fifo_writer.poll() is not None if eof: break elif self._daemon_is_alive(): self._timeout_retry_msg(start_time, msg) else: - pipe_reader.cancel() + fifo_writer.cancel() self._no_daemon_msg() - try: - os.kill(pid, signal.SIGKILL) - os.waitpid(pid, 0) - except OSError as e: - portage.util.writemsg_level( - "ebuild-ipc: %s\n" % (e,), - level=logging.ERROR, noiselevel=-1) + fifo_writer.wait() return 2 - try: - wait_retval = os.waitpid(pid, 0) - except OSError as e: - portage.util.writemsg_level( - "ebuild-ipc: %s: %s\n" % (msg, e), - level=logging.ERROR, noiselevel=-1) - return 2 - - if not os.WIFEXITED(wait_retval[1]): - portage.util.writemsg_level( - "ebuild-ipc: %s: %s\n" % (msg, - portage.localization._('subprocess failure: %s') % \ - wait_retval[1]), - level=logging.ERROR, noiselevel=-1) - return 2 - - return os.WEXITSTATUS(wait_retval[1]) + return fifo_writer.wait() def _receive_reply(self, input_fd): @@ -218,31 +203,9 @@ class EbuildIpc(object): # un-interrupted, while the parent handles all timeout # considerations. This helps to avoid possible race conditions # from interference between timeouts and blocking IO operations. - pr, pw = os.pipe() - pid = os.fork() - - if pid == 0: - retval = 2 - try: - os.close(pr) - - # File streams are in unbuffered mode since we do atomic - # read and write of whole pickles. - output_file = open(self.ipc_in_fifo, 'wb', 0) - output_file.write(pickle.dumps(args)) - output_file.close() - retval = os.EX_OK - except SystemExit: - raise - except: - traceback.print_exc() - finally: - os._exit(retval) - - os.close(pw) - msg = portage.localization._('during write') - retval = self._wait(pid, pr, msg) + retval = self._run_writer(FifoWriter(buf=pickle.dumps(args), + fifo=self.ipc_in_fifo, scheduler=global_event_loop()), msg) if retval != os.EX_OK: portage.util.writemsg_level( -- cgit v1.2.3-1-g7c22