# Copyright 2012 Gentoo Foundation # Distributed under the terms of the GNU General Public License v2 try: import threading except ImportError: # dummy_threading will not suffice threading = None from portage import os from _emerge.AbstractPollTask import AbstractPollTask class PipeReaderBlockingIO(AbstractPollTask): """ Reads output from one or more files and saves it in memory, for retrieval via the getvalue() method. This is driven by a thread for each input file, in order to support blocking IO. This may be useful for using threads to handle blocking IO with Jython, since Jython lacks the fcntl module which is needed for non-blocking IO (see http://bugs.jython.org/issue1074). """ __slots__ = ("input_files", "_read_data", "_terminate", "_threads", "_thread_rlock") def _start(self): self._terminate = threading.Event() self._threads = {} self._read_data = [] self._registered = True self._thread_rlock = threading.RLock() with self._thread_rlock: for f in self.input_files.values(): t = threading.Thread(target=self._reader_thread, args=(f,)) t.daemon = True t.start() self._threads[f] = t def _reader_thread(self, f): try: terminated = self._terminate.is_set except AttributeError: # Jython 2.7.0a2 terminated = self._terminate.isSet bufsize = self._bufsize while not terminated(): buf = f.read(bufsize) with self._thread_rlock: if terminated(): break elif buf: self._read_data.append(buf) else: del self._threads[f] if not self._threads: # Thread-safe callback to EventLoop self.scheduler.idle_add(self._eof) break f.close() def _eof(self): self._registered = False if self.returncode is None: self.returncode = os.EX_OK self.wait() return False def _cancel(self): self._terminate.set() self._registered = False if self.returncode is None: self.returncode = self._cancelled_returncode self.wait() def _wait(self): if self.returncode is not None: return self.returncode self._wait_loop() self.returncode = os.EX_OK return self.returncode def getvalue(self): """Retrieve the entire contents""" with self._thread_rlock: return b''.join(self._read_data) def close(self): """Free the memory buffer.""" with self._thread_rlock: self._read_data = None