From 529a4762cfd8b650b8567be2f1423926d6cbaf9e Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Sat, 12 Jul 2008 07:32:55 +0000 Subject: Derive a PtyReaderTestCase from PipeReaderTestCase. This test is expected to fail on some operating systems such as Darwin that do not support poll() on pty devices. svn path=/main/trunk/; revision=11023 --- pym/portage/tests/process/test_poll.py | 55 ++++++++++++++++++++++++++++++++-- 1 file changed, 52 insertions(+), 3 deletions(-) (limited to 'pym/portage/tests/process') diff --git a/pym/portage/tests/process/test_poll.py b/pym/portage/tests/process/test_poll.py index 45c0ad03e..e8de967f7 100644 --- a/pym/portage/tests/process/test_poll.py +++ b/pym/portage/tests/process/test_poll.py @@ -3,10 +3,20 @@ # $Id: test_spawn.py 8474 2007-11-09 03:35:38Z zmedico $ import errno, os, sys +import fcntl +import termios +import portage +from portage.output import get_term_size, set_term_size from portage.tests import TestCase from _emerge import PipeReader, SpawnProcess, TaskScheduler -class PollTestCase(TestCase): +class PipeReaderTestCase(TestCase): + + def _create_pipe(self): + return os.pipe() + + def _assertEqual(self, test_string, consumer_value): + self.assertEqual(test_string, consumer_value) def testPipeReader(self): """ @@ -17,7 +27,7 @@ class PollTestCase(TestCase): test_string = 2 * "blah blah blah\n" - master_fd, slave_fd = os.pipe() + master_fd, slave_fd = self._create_pipe() master_file = os.fdopen(master_fd, 'r') task_scheduler = TaskScheduler(max_jobs=2) @@ -40,4 +50,43 @@ class PollTestCase(TestCase): producer.addStartListener(producer_start_cb) task_scheduler.run() - self.assertEqual(test_string, consumer.getvalue()) + self._assertEqual(test_string, consumer.getvalue()) + +class PtyReaderTestCase(PipeReaderTestCase): + + def _assertEqual(self, test_string, consumer_value): + if test_string != consumer_value: + # This test is expected to fail on some operating systems + # such as Darwin that do not support poll() on pty devices. + self.todo = True + self.assertEqual(test_string, consumer_value) + + def _create_pipe(self): + got_pty = False + + if portage._disable_openpty: + master_fd, slave_fd = os.pipe() + else: + from pty import openpty + try: + master_fd, slave_fd = openpty() + got_pty = True + except EnvironmentError, e: + portage._disable_openpty = True + portage.writemsg("openpty failed: '%s'\n" % str(e), + noiselevel=-1) + del e + master_fd, slave_fd = os.pipe() + + if got_pty: + # Disable post-processing of output since otherwise weird + # things like \n -> \r\n transformations may occur. + mode = termios.tcgetattr(slave_fd) + mode[1] &= ~termios.OPOST + termios.tcsetattr(slave_fd, termios.TCSANOW, mode) + + if got_pty and sys.stdout.isatty(): + rows, columns = get_term_size() + set_term_size(rows, columns, slave_fd) + + return (master_fd, slave_fd) -- cgit v1.2.3-1-g7c22