summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2012-12-27 00:15:22 -0800
committerZac Medico <zmedico@gentoo.org>2012-12-27 00:17:37 -0800
commit684e0f9fa1ed5039dc51e7854bd9cb318a2eaafe (patch)
treec58874bceab7a0300d23ab7bef15e17f85b3fb47
parent9f65889534279731f4b00af243c1edc885eece09 (diff)
downloadportage-684e0f9fa1ed5039dc51e7854bd9cb318a2eaafe.tar.gz
portage-684e0f9fa1ed5039dc51e7854bd9cb318a2eaafe.tar.bz2
portage-684e0f9fa1ed5039dc51e7854bd9cb318a2eaafe.zip
Add PipeReaderBlockingIO, Jython experimentation.
-rw-r--r--.gitignore1
-rw-r--r--pym/portage/tests/process/test_PopenProcessBlockingIO.py62
-rw-r--r--pym/portage/util/_async/PipeReaderBlockingIO.py87
-rw-r--r--pym/portage/util/_eventloop/EventLoop.py16
4 files changed, 163 insertions, 3 deletions
diff --git a/.gitignore b/.gitignore
index 8da61905a..2236c6379 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,2 +1,3 @@
*.py[co]
+*.class
/tags
diff --git a/pym/portage/tests/process/test_PopenProcessBlockingIO.py b/pym/portage/tests/process/test_PopenProcessBlockingIO.py
new file mode 100644
index 000000000..9cdad326d
--- /dev/null
+++ b/pym/portage/tests/process/test_PopenProcessBlockingIO.py
@@ -0,0 +1,62 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import subprocess
+
+try:
+ import threading
+except ImportError:
+ threading = None
+
+from portage import os
+from portage.tests import TestCase
+from portage.util._async.PopenProcess import PopenProcess
+from portage.util._eventloop.global_event_loop import global_event_loop
+from portage.util._async.PipeReaderBlockingIO import PipeReaderBlockingIO
+
+class PopenPipeBlockingIOTestCase(TestCase):
+ """
+ Test PopenProcess, which can be useful for Jython support:
+ * use subprocess.Popen since Jython does not support os.fork()
+ * use blocking IO with threads, since Jython does not support
+ fcntl non-blocking IO)
+ """
+
+ _echo_cmd = "echo -n '%s'"
+
+ def _testPipeReader(self, test_string):
+ """
+ Use a poll loop to read data from a pipe and assert that
+ the data written to the pipe is identical to the data
+ read from the pipe.
+ """
+
+ producer = PopenProcess(proc=subprocess.Popen(
+ ["bash", "-c", self._echo_cmd % test_string],
+ stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
+ pipe_reader=PipeReaderBlockingIO(), scheduler=global_event_loop())
+
+ consumer = producer.pipe_reader
+ consumer.input_files = {"producer" : producer.proc.stdout}
+
+ producer.start()
+ producer.wait()
+
+ self.assertEqual(producer.returncode, os.EX_OK)
+ self.assertEqual(consumer.returncode, os.EX_OK)
+
+ return consumer.getvalue().decode('ascii', 'replace')
+
+ def testPopenPipeBlockingIO(self):
+
+ if threading is None:
+ skip_reason = "threading disabled"
+ self.portage_skip = "threading disabled"
+ self.assertFalse(True, skip_reason)
+ return
+
+ for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
+ test_string = x * "a"
+ output = self._testPipeReader(test_string)
+ self.assertEqual(test_string, output,
+ "x = %s, len(output) = %s" % (x, len(output)))
diff --git a/pym/portage/util/_async/PipeReaderBlockingIO.py b/pym/portage/util/_async/PipeReaderBlockingIO.py
new file mode 100644
index 000000000..8ce2ec51b
--- /dev/null
+++ b/pym/portage/util/_async/PipeReaderBlockingIO.py
@@ -0,0 +1,87 @@
+# Copyright 2012 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import threading
+
+from portage import os
+from _emerge.AbstractPollTask import AbstractPollTask
+
+class PipeReaderBlockingIO(AbstractPollTask):
+ """
+ Reads output from one or more files and saves it in memory, for
+ retrieval via the getvalue() method. This is driven by a thread
+ for each input file, in order to support blocking IO. This may
+ be useful for using threads to handle blocking IO with Jython,
+ since Jython lacks the fcntl module which is needed for
+ non-blocking IO (see http://bugs.jython.org/issue1074).
+ """
+
+ __slots__ = ("input_files", "_read_data", "_terminate",
+ "_threads", "_thread_rlock")
+
+ def _start(self):
+ self._terminate = threading.Event()
+ self._threads = {}
+ self._read_data = []
+
+ self._registered = True
+ self._thread_rlock = threading.RLock()
+ with self._thread_rlock:
+ for f in self.input_files.values():
+ t = threading.Thread(target=self._reader_thread, args=(f,))
+ t.daemon = True
+ t.start()
+ self._threads[f] = t
+
+ def _reader_thread(self, f):
+ try:
+ terminated = self._terminate.is_set
+ except AttributeError:
+ # Jython 2.7.0a2
+ terminated = self._terminate.isSet
+ bufsize = self._bufsize
+ while not terminated():
+ buf = f.read(bufsize)
+ with self._thread_rlock:
+ if terminated():
+ break
+ elif buf:
+ self._read_data.append(buf)
+ else:
+ del self._threads[f]
+ if not self._threads:
+ # Thread-safe callback to EventLoop
+ self.scheduler.idle_add(self._eof)
+ break
+ f.close()
+
+ def _eof(self):
+ self._registered = False
+ if self.returncode is None:
+ self.returncode = os.EX_OK
+ self.wait()
+ return False
+
+ def _cancel(self):
+ self._terminate.set()
+ self._registered = False
+ if self.returncode is None:
+ self.returncode = self._cancelled_returncode
+ self.wait()
+
+ def _wait(self):
+ if self.returncode is not None:
+ return self.returncode
+ self._wait_loop()
+ self.returncode = os.EX_OK
+ return self.returncode
+
+ def getvalue(self):
+ """Retrieve the entire contents"""
+ with self._thread_rlock:
+ return b''.join(self._read_data)
+
+ def close(self):
+ """Free the memory buffer."""
+ with self._thread_rlock:
+ self._read_data = None
diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py
index 37e600792..efd1f1376 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -2,7 +2,6 @@
# Distributed under the terms of the GNU General Public License v2
import errno
-import fcntl
import logging
import os
import select
@@ -10,6 +9,12 @@ import signal
import time
try:
+ import fcntl
+except ImportError:
+ # http://bugs.jython.org/issue1074
+ fcntl = None
+
+try:
import threading
except ImportError:
import dummy_threading as threading
@@ -54,7 +59,7 @@ class EventLoop(object):
that global_event_loop does not need constructor arguments)
@type main: bool
"""
- self._use_signal = main
+ self._use_signal = main and fcntl is not None
self._thread_rlock = threading.RLock()
self._poll_event_queue = []
self._poll_event_handlers = {}
@@ -524,7 +529,12 @@ def can_poll_device():
return _can_poll_device
p = select.poll()
- p.register(dev_null.fileno(), PollConstants.POLLIN)
+ try:
+ p.register(dev_null.fileno(), PollConstants.POLLIN)
+ except TypeError:
+ # Jython: Object 'org.python.core.io.FileIO@f8f175' is not watchable
+ _can_poll_device = False
+ return _can_poll_device
invalid_request = False
for f, event in p.poll():