From 915348ce34fc499ac295b8f0ffee9f0829803542 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Thu, 15 Dec 2011 15:55:42 -0800 Subject: test_poll: test different sizes, and pty too --- pym/_emerge/PipeReader.py | 4 +++- pym/portage/tests/process/test_poll.py | 23 +++++++++++++++++------ 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/pym/_emerge/PipeReader.py b/pym/_emerge/PipeReader.py index af8cfdd4e..78acb72df 100644 --- a/pym/_emerge/PipeReader.py +++ b/pym/_emerge/PipeReader.py @@ -64,7 +64,9 @@ class PipeReader(AbstractPollTask): try: data = os.read(fd, self._bufsize) except OSError as e: - if e.errno not in (errno.EAGAIN,): + # EIO happens with pty on Linux after the + # slave end of the pty has been closed. + if e.errno not in (errno.EAGAIN, errno.EIO): raise break else: diff --git a/pym/portage/tests/process/test_poll.py b/pym/portage/tests/process/test_poll.py index c3b50d4be..e7a47028f 100644 --- a/pym/portage/tests/process/test_poll.py +++ b/pym/portage/tests/process/test_poll.py @@ -3,25 +3,28 @@ from portage import os from portage.tests import TestCase +from portage.util._pty import _create_pty_or_pipe from _emerge.PollScheduler import PollScheduler from _emerge.PipeReader import PipeReader from _emerge.SpawnProcess import SpawnProcess class PipeReaderTestCase(TestCase): - def testPipeReader(self): + def _testPipeReader(self, test_string, use_pty): """ Use a poll loop to read data from a pipe and assert that the data written to the pipe is identical to the data read from the pipe. """ - test_string = 2 * "blah blah blah\n" - scheduler = PollScheduler().sched_iface - master_fd, slave_fd = os.pipe() + if use_pty: + got_pty, master_fd, slave_fd = _create_pty_or_pipe() + else: + got_pty = False + master_fd, slave_fd = os.pipe() master_file = os.fdopen(master_fd, 'rb', 0) - slave_file = os.fdopen(slave_fd, 'wb') + slave_file = os.fdopen(slave_fd, 'wb', 0) producer = SpawnProcess( args=["bash", "-c", "echo -n '%s'" % test_string], env=os.environ, fd_pipes={1:slave_fd}, @@ -44,4 +47,12 @@ class PipeReaderTestCase(TestCase): self.assertEqual(consumer.returncode, os.EX_OK) output = consumer.getvalue().decode('ascii', 'replace') - self.assertEqual(test_string, output) + return (output, got_pty) + + def testPipeReader(self): + for use_pty in (False, True): + for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14): + test_string = x * "a" + output, got_pty = self._testPipeReader(test_string, use_pty) + self.assertEqual(test_string, output, + "x = %s, use_pty = %s, got_pty = %s" % (x, use_pty, got_pty)) -- cgit v1.2.3-1-g7c22