From 684e0f9fa1ed5039dc51e7854bd9cb318a2eaafe Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Thu, 27 Dec 2012 00:15:22 -0800 Subject: Add PipeReaderBlockingIO, Jython experimentation. --- pym/portage/util/_async/PipeReaderBlockingIO.py | 87 +++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 pym/portage/util/_async/PipeReaderBlockingIO.py (limited to 'pym/portage/util/_async') diff --git a/pym/portage/util/_async/PipeReaderBlockingIO.py b/pym/portage/util/_async/PipeReaderBlockingIO.py new file mode 100644 index 000000000..8ce2ec51b --- /dev/null +++ b/pym/portage/util/_async/PipeReaderBlockingIO.py @@ -0,0 +1,87 @@ +# Copyright 2012 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import threading + +from portage import os +from _emerge.AbstractPollTask import AbstractPollTask + +class PipeReaderBlockingIO(AbstractPollTask): + """ + Reads output from one or more files and saves it in memory, for + retrieval via the getvalue() method. This is driven by a thread + for each input file, in order to support blocking IO. This may + be useful for using threads to handle blocking IO with Jython, + since Jython lacks the fcntl module which is needed for + non-blocking IO (see http://bugs.jython.org/issue1074). + """ + + __slots__ = ("input_files", "_read_data", "_terminate", + "_threads", "_thread_rlock") + + def _start(self): + self._terminate = threading.Event() + self._threads = {} + self._read_data = [] + + self._registered = True + self._thread_rlock = threading.RLock() + with self._thread_rlock: + for f in self.input_files.values(): + t = threading.Thread(target=self._reader_thread, args=(f,)) + t.daemon = True + t.start() + self._threads[f] = t + + def _reader_thread(self, f): + try: + terminated = self._terminate.is_set + except AttributeError: + # Jython 2.7.0a2 + terminated = self._terminate.isSet + bufsize = self._bufsize + while not terminated(): + buf = f.read(bufsize) + with self._thread_rlock: + if terminated(): + break + elif buf: + self._read_data.append(buf) + else: + del self._threads[f] + if not self._threads: + # Thread-safe callback to EventLoop + self.scheduler.idle_add(self._eof) + break + f.close() + + def _eof(self): + self._registered = False + if self.returncode is None: + self.returncode = os.EX_OK + self.wait() + return False + + def _cancel(self): + self._terminate.set() + self._registered = False + if self.returncode is None: + self.returncode = self._cancelled_returncode + self.wait() + + def _wait(self): + if self.returncode is not None: + return self.returncode + self._wait_loop() + self.returncode = os.EX_OK + return self.returncode + + def getvalue(self): + """Retrieve the entire contents""" + with self._thread_rlock: + return b''.join(self._read_data) + + def close(self): + """Free the memory buffer.""" + with self._thread_rlock: + self._read_data = None -- cgit v1.2.3-1-g7c22