diff options
author | Zac Medico <zmedico@gentoo.org> | 2012-12-27 00:15:22 -0800 |
---|---|---|
committer | Zac Medico <zmedico@gentoo.org> | 2012-12-27 00:17:37 -0800 |
commit | 684e0f9fa1ed5039dc51e7854bd9cb318a2eaafe (patch) | |
tree | c58874bceab7a0300d23ab7bef15e17f85b3fb47 /pym/portage/util | |
parent | 9f65889534279731f4b00af243c1edc885eece09 (diff) | |
download | portage-684e0f9fa1ed5039dc51e7854bd9cb318a2eaafe.tar.gz portage-684e0f9fa1ed5039dc51e7854bd9cb318a2eaafe.tar.bz2 portage-684e0f9fa1ed5039dc51e7854bd9cb318a2eaafe.zip |
Add PipeReaderBlockingIO, Jython experimentation.
Diffstat (limited to 'pym/portage/util')
-rw-r--r-- | pym/portage/util/_async/PipeReaderBlockingIO.py | 87 | ||||
-rw-r--r-- | pym/portage/util/_eventloop/EventLoop.py | 16 |
2 files changed, 100 insertions, 3 deletions
diff --git a/pym/portage/util/_async/PipeReaderBlockingIO.py b/pym/portage/util/_async/PipeReaderBlockingIO.py new file mode 100644 index 000000000..8ce2ec51b --- /dev/null +++ b/pym/portage/util/_async/PipeReaderBlockingIO.py @@ -0,0 +1,87 @@ +# Copyright 2012 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import threading + +from portage import os +from _emerge.AbstractPollTask import AbstractPollTask + +class PipeReaderBlockingIO(AbstractPollTask): + """ + Reads output from one or more files and saves it in memory, for + retrieval via the getvalue() method. This is driven by a thread + for each input file, in order to support blocking IO. This may + be useful for using threads to handle blocking IO with Jython, + since Jython lacks the fcntl module which is needed for + non-blocking IO (see http://bugs.jython.org/issue1074). + """ + + __slots__ = ("input_files", "_read_data", "_terminate", + "_threads", "_thread_rlock") + + def _start(self): + self._terminate = threading.Event() + self._threads = {} + self._read_data = [] + + self._registered = True + self._thread_rlock = threading.RLock() + with self._thread_rlock: + for f in self.input_files.values(): + t = threading.Thread(target=self._reader_thread, args=(f,)) + t.daemon = True + t.start() + self._threads[f] = t + + def _reader_thread(self, f): + try: + terminated = self._terminate.is_set + except AttributeError: + # Jython 2.7.0a2 + terminated = self._terminate.isSet + bufsize = self._bufsize + while not terminated(): + buf = f.read(bufsize) + with self._thread_rlock: + if terminated(): + break + elif buf: + self._read_data.append(buf) + else: + del self._threads[f] + if not self._threads: + # Thread-safe callback to EventLoop + self.scheduler.idle_add(self._eof) + break + f.close() + + def _eof(self): + self._registered = False + if self.returncode is None: + self.returncode = os.EX_OK + self.wait() + return False + + def _cancel(self): + self._terminate.set() + self._registered = False + if self.returncode is None: + self.returncode = self._cancelled_returncode + self.wait() + + def _wait(self): + if self.returncode is not None: + return self.returncode + self._wait_loop() + self.returncode = os.EX_OK + return self.returncode + + def getvalue(self): + """Retrieve the entire contents""" + with self._thread_rlock: + return b''.join(self._read_data) + + def close(self): + """Free the memory buffer.""" + with self._thread_rlock: + self._read_data = None diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index 37e600792..efd1f1376 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -2,7 +2,6 @@ # Distributed under the terms of the GNU General Public License v2 import errno -import fcntl import logging import os import select @@ -10,6 +9,12 @@ import signal import time try: + import fcntl +except ImportError: + # http://bugs.jython.org/issue1074 + fcntl = None + +try: import threading except ImportError: import dummy_threading as threading @@ -54,7 +59,7 @@ class EventLoop(object): that global_event_loop does not need constructor arguments) @type main: bool """ - self._use_signal = main + self._use_signal = main and fcntl is not None self._thread_rlock = threading.RLock() self._poll_event_queue = [] self._poll_event_handlers = {} @@ -524,7 +529,12 @@ def can_poll_device(): return _can_poll_device p = select.poll() - p.register(dev_null.fileno(), PollConstants.POLLIN) + try: + p.register(dev_null.fileno(), PollConstants.POLLIN) + except TypeError: + # Jython: Object 'org.python.core.io.FileIO@f8f175' is not watchable + _can_poll_device = False + return _can_poll_device invalid_request = False for f, event in p.poll(): |