summaryrefslogtreecommitdiffstats
path: root/pym/portage/tests/process/test_poll.py
blob: e8de967f7e7a0db0f9161ccf8371a5cce4222f3f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# Copyright 1998-2008 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
# $Id: test_spawn.py 8474 2007-11-09 03:35:38Z zmedico $

import errno, os, sys
import fcntl
import termios
import portage
from portage.output import get_term_size, set_term_size
from portage.tests import TestCase
from _emerge import PipeReader, SpawnProcess, TaskScheduler

class PipeReaderTestCase(TestCase):

	def _create_pipe(self):
		return os.pipe()

	def _assertEqual(self, test_string, consumer_value):
		self.assertEqual(test_string, consumer_value)

	def testPipeReader(self):
		"""
		Use a poll loop to read data from a pipe and assert that
		the data written to the pipe is identical to the data
		read from the pipe.
		"""

		test_string = 2 * "blah blah blah\n"

		master_fd, slave_fd = self._create_pipe()
 		master_file = os.fdopen(master_fd, 'r')

		task_scheduler = TaskScheduler(max_jobs=2)
		scheduler = task_scheduler.sched_iface

		producer = SpawnProcess(
			args=["bash", "-c", "echo -n '%s'" % test_string],
			fd_pipes={1:slave_fd}, scheduler=scheduler)

		consumer = PipeReader(
			input_files={"producer" : master_file},
			scheduler=scheduler)

		task_scheduler.add(producer)
		task_scheduler.add(consumer)

		def producer_start_cb(task):
			os.close(slave_fd)

		producer.addStartListener(producer_start_cb)
		task_scheduler.run()

		self._assertEqual(test_string, consumer.getvalue())

class PtyReaderTestCase(PipeReaderTestCase):

	def _assertEqual(self, test_string, consumer_value):
		if test_string != consumer_value:
			# This test is expected to fail on some operating systems
			# such as Darwin that do not support poll() on pty devices.
			self.todo = True
		self.assertEqual(test_string, consumer_value)

	def _create_pipe(self):
		got_pty = False

		if portage._disable_openpty:
			master_fd, slave_fd = os.pipe()
		else:
			from pty import openpty
			try:
				master_fd, slave_fd = openpty()
				got_pty = True
			except EnvironmentError, e:
				portage._disable_openpty = True
				portage.writemsg("openpty failed: '%s'\n" % str(e),
					noiselevel=-1)
				del e
				master_fd, slave_fd = os.pipe()

		if got_pty:
			# Disable post-processing of output since otherwise weird
			# things like \n -> \r\n transformations may occur.
			mode = termios.tcgetattr(slave_fd)
			mode[1] &= ~termios.OPOST
			termios.tcsetattr(slave_fd, termios.TCSANOW, mode)

		if got_pty and sys.stdout.isatty():
			rows, columns = get_term_size()
			set_term_size(rows, columns, slave_fd)

		return (master_fd, slave_fd)