summaryrefslogtreecommitdiffstats
path: root/pym/portage/tests/process/test_PopenProcess.py
blob: 5ede164af920a58dc5f460f5b752a9c066c57886 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
# Copyright 2012 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2

import subprocess
import tempfile

from portage import os
from portage.tests import TestCase
from portage.util._async.PipeLogger import PipeLogger
from portage.util._async.PopenProcess import PopenProcess
from portage.util._eventloop.global_event_loop import global_event_loop
from _emerge.PipeReader import PipeReader

class PopenPipeTestCase(TestCase):
	"""
	Test PopenProcess, which can be useful for Jython support, since it
	uses the subprocess.Popen instead of os.fork().
	"""

	_echo_cmd = "echo -n '%s'"

	def _testPipeReader(self, test_string):
		"""
		Use a poll loop to read data from a pipe and assert that
		the data written to the pipe is identical to the data
		read from the pipe.
		"""

		producer = PopenProcess(proc=subprocess.Popen(
			["bash", "-c", self._echo_cmd % test_string],
			stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
			pipe_reader=PipeReader(), scheduler=global_event_loop())

		consumer = producer.pipe_reader
		consumer.input_files = {"producer" : producer.proc.stdout}

		producer.start()
		producer.wait()

		self.assertEqual(producer.returncode, os.EX_OK)
		self.assertEqual(consumer.returncode, os.EX_OK)

		return consumer.getvalue().decode('ascii', 'replace')

	def _testPipeLogger(self, test_string):

		producer = PopenProcess(proc=subprocess.Popen(
			["bash", "-c", self._echo_cmd % test_string],
			stdout=subprocess.PIPE, stderr=subprocess.STDOUT),
			scheduler=global_event_loop())

		fd, log_file_path = tempfile.mkstemp()
		try:

			consumer = PipeLogger(background=True,
				input_fd=os.dup(producer.proc.stdout.fileno()),
				log_file_path=log_file_path)

			# Close the stdout pipe, since we duplicated it, and it
			# must be closed in order to avoid a ResourceWarning.
			producer.proc.stdout.close()
			producer.pipe_reader = consumer

			producer.start()
			producer.wait()

			self.assertEqual(producer.returncode, os.EX_OK)
			self.assertEqual(consumer.returncode, os.EX_OK)

			with open(log_file_path, 'rb') as f:
				content = f.read()

		finally:
			os.close(fd)
			os.unlink(log_file_path)

		return content.decode('ascii', 'replace')

	def testPopenPipe(self):
		for x in (1, 2, 5, 6, 7, 8, 2**5, 2**10, 2**12, 2**13, 2**14):
			test_string = x * "a"
			output = self._testPipeReader(test_string)
			self.assertEqual(test_string, output,
				"x = %s, len(output) = %s" % (x, len(output)))

			output = self._testPipeLogger(test_string)
			self.assertEqual(test_string, output,
				"x = %s, len(output) = %s" % (x, len(output)))