From 684e0f9fa1ed5039dc51e7854bd9cb318a2eaafe Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Thu, 27 Dec 2012 00:15:22 -0800 Subject: Add PipeReaderBlockingIO, Jython experimentation. --- .../tests/process/test_PopenProcessBlockingIO.py | 62 +++++++++++++++ pym/portage/util/_async/PipeReaderBlockingIO.py | 87 ++++++++++++++++++++++ pym/portage/util/_eventloop/EventLoop.py | 16 +++- 3 files changed, 162 insertions(+), 3 deletions(-) create mode 100644 pym/portage/tests/process/test_PopenProcessBlockingIO.py create mode 100644 pym/portage/util/_async/PipeReaderBlockingIO.py (limited to 'pym/portage') diff --git a/pym/portage/tests/process/test_PopenProcessBlockingIO.py b/pym/portage/tests/process/test_PopenProcessBlockingIO.py new file mode 100644 index 000000000..9cdad326d --- /dev/null +++ b/pym/portage/tests/process/test_PopenProcessBlockingIO.py @@ -0,0 +1,62 @@ +# Copyright 2012 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import subprocess + +try: + import threading +except ImportError: + threading = None + +from portage import os +from portage.tests import TestCase +from portage.util._async.PopenProcess import PopenProcess +from portage.util._eventloop.global_event_loop import global_event_loop +from portage.util._async.PipeReaderBlockingIO import PipeReaderBlockingIO + +class PopenPipeBlockingIOTestCase(TestCase): + """ + Test PopenProcess, which can be useful for Jython support: + * use subprocess.Popen since Jython does not support os.fork() + * use blocking IO with threads, since Jython does not support + fcntl non-blocking IO) + """ + + _echo_cmd = "echo -n '%s'" + + def _testPipeReader(self, test_string): + """ + Use a poll loop to read data from a pipe and assert that + the data written to the pipe is identical to the data + read from the pipe. + """ + + producer = PopenProcess(proc=subprocess.Popen( + ["bash", "-c", self._echo_cmd % test_string], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT), + pipe_reader=PipeReaderBlockingIO(), scheduler=global_event_loop()) + + consumer = producer.pipe_reader + consumer.input_files = {"producer" : producer.proc.stdout} + + producer.start() + producer.wait() + + self.assertEqual(producer.returncode, os.EX_OK) + self.assertEqual(consumer.returncode, os.EX_OK) + + return consumer.getvalue().decode('ascii', 'replace') + + def testPopenPipeBlockingIO(self): + + if threading is None: + skip_reason = "threading disabled" + self.portage_skip = "threading disabled" + self.assertFalse(True, skip_reason) + return + + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14): + test_string = x * "a" + output = self._testPipeReader(test_string) + self.assertEqual(test_string, output, + "x = %s, len(output) = %s" % (x, len(output))) diff --git a/pym/portage/util/_async/PipeReaderBlockingIO.py b/pym/portage/util/_async/PipeReaderBlockingIO.py new file mode 100644 index 000000000..8ce2ec51b --- /dev/null +++ b/pym/portage/util/_async/PipeReaderBlockingIO.py @@ -0,0 +1,87 @@ +# Copyright 2012 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import threading + +from portage import os +from _emerge.AbstractPollTask import AbstractPollTask + +class PipeReaderBlockingIO(AbstractPollTask): + """ + Reads output from one or more files and saves it in memory, for + retrieval via the getvalue() method. This is driven by a thread + for each input file, in order to support blocking IO. This may + be useful for using threads to handle blocking IO with Jython, + since Jython lacks the fcntl module which is needed for + non-blocking IO (see http://bugs.jython.org/issue1074). + """ + + __slots__ = ("input_files", "_read_data", "_terminate", + "_threads", "_thread_rlock") + + def _start(self): + self._terminate = threading.Event() + self._threads = {} + self._read_data = [] + + self._registered = True + self._thread_rlock = threading.RLock() + with self._thread_rlock: + for f in self.input_files.values(): + t = threading.Thread(target=self._reader_thread, args=(f,)) + t.daemon = True + t.start() + self._threads[f] = t + + def _reader_thread(self, f): + try: + terminated = self._terminate.is_set + except AttributeError: + # Jython 2.7.0a2 + terminated = self._terminate.isSet + bufsize = self._bufsize + while not terminated(): + buf = f.read(bufsize) + with self._thread_rlock: + if terminated(): + break + elif buf: + self._read_data.append(buf) + else: + del self._threads[f] + if not self._threads: + # Thread-safe callback to EventLoop + self.scheduler.idle_add(self._eof) + break + f.close() + + def _eof(self): + self._registered = False + if self.returncode is None: + self.returncode = os.EX_OK + self.wait() + return False + + def _cancel(self): + self._terminate.set() + self._registered = False + if self.returncode is None: + self.returncode = self._cancelled_returncode + self.wait() + + def _wait(self): + if self.returncode is not None: + return self.returncode + self._wait_loop() + self.returncode = os.EX_OK + return self.returncode + + def getvalue(self): + """Retrieve the entire contents""" + with self._thread_rlock: + return b''.join(self._read_data) + + def close(self): + """Free the memory buffer.""" + with self._thread_rlock: + self._read_data = None diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index 37e600792..efd1f1376 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -2,13 +2,18 @@ # Distributed under the terms of the GNU General Public License v2 import errno -import fcntl import logging import os import select import signal import time +try: + import fcntl +except ImportError: + # http://bugs.jython.org/issue1074 + fcntl = None + try: import threading except ImportError: @@ -54,7 +59,7 @@ class EventLoop(object): that global_event_loop does not need constructor arguments) @type main: bool """ - self._use_signal = main + self._use_signal = main and fcntl is not None self._thread_rlock = threading.RLock() self._poll_event_queue = [] self._poll_event_handlers = {} @@ -524,7 +529,12 @@ def can_poll_device(): return _can_poll_device p = select.poll() - p.register(dev_null.fileno(), PollConstants.POLLIN) + try: + p.register(dev_null.fileno(), PollConstants.POLLIN) + except TypeError: + # Jython: Object 'org.python.core.io.FileIO@f8f175' is not watchable + _can_poll_device = False + return _can_poll_device invalid_request = False for f, event in p.poll(): -- cgit v1.2.3-1-g7c22