diff options
author | Zac Medico <zmedico@gentoo.org> | 2012-12-27 00:15:22 -0800 |
---|---|---|
committer | Zac Medico <zmedico@gentoo.org> | 2012-12-27 00:17:37 -0800 |
commit | 684e0f9fa1ed5039dc51e7854bd9cb318a2eaafe (patch) | |
tree | c58874bceab7a0300d23ab7bef15e17f85b3fb47 /pym/portage/tests | |
parent | 9f65889534279731f4b00af243c1edc885eece09 (diff) | |
download | portage-684e0f9fa1ed5039dc51e7854bd9cb318a2eaafe.tar.gz portage-684e0f9fa1ed5039dc51e7854bd9cb318a2eaafe.tar.bz2 portage-684e0f9fa1ed5039dc51e7854bd9cb318a2eaafe.zip |
Add PipeReaderBlockingIO, Jython experimentation.
Diffstat (limited to 'pym/portage/tests')
-rw-r--r-- | pym/portage/tests/process/test_PopenProcessBlockingIO.py | 62 |
1 files changed, 62 insertions, 0 deletions
diff --git a/pym/portage/tests/process/test_PopenProcessBlockingIO.py b/pym/portage/tests/process/test_PopenProcessBlockingIO.py new file mode 100644 index 000000000..9cdad326d --- /dev/null +++ b/pym/portage/tests/process/test_PopenProcessBlockingIO.py @@ -0,0 +1,62 @@ +# Copyright 2012 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import subprocess + +try: + import threading +except ImportError: + threading = None + +from portage import os +from portage.tests import TestCase +from portage.util._async.PopenProcess import PopenProcess +from portage.util._eventloop.global_event_loop import global_event_loop +from portage.util._async.PipeReaderBlockingIO import PipeReaderBlockingIO + +class PopenPipeBlockingIOTestCase(TestCase): + """ + Test PopenProcess, which can be useful for Jython support: + * use subprocess.Popen since Jython does not support os.fork() + * use blocking IO with threads, since Jython does not support + fcntl non-blocking IO) + """ + + _echo_cmd = "echo -n '%s'" + + def _testPipeReader(self, test_string): + """ + Use a poll loop to read data from a pipe and assert that + the data written to the pipe is identical to the data + read from the pipe. + """ + + producer = PopenProcess(proc=subprocess.Popen( + ["bash", "-c", self._echo_cmd % test_string], + stdout=subprocess.PIPE, stderr=subprocess.STDOUT), + pipe_reader=PipeReaderBlockingIO(), scheduler=global_event_loop()) + + consumer = producer.pipe_reader + consumer.input_files = {"producer" : producer.proc.stdout} + + producer.start() + producer.wait() + + self.assertEqual(producer.returncode, os.EX_OK) + self.assertEqual(consumer.returncode, os.EX_OK) + + return consumer.getvalue().decode('ascii', 'replace') + + def testPopenPipeBlockingIO(self): + + if threading is None: + skip_reason = "threading disabled" + self.portage_skip = "threading disabled" + self.assertFalse(True, skip_reason) + return + + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14): + test_string = x * "a" + output = self._testPipeReader(test_string) + self.assertEqual(test_string, output, + "x = %s, len(output) = %s" % (x, len(output))) |