summaryrefslogtreecommitdiffstats
path: root/pym/_emerge/PipeReader.py
blob: be93be323ac77c8b571880560fb6c752a100f073 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
# Copyright 1999-2013 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2

from portage import os
from _emerge.AbstractPollTask import AbstractPollTask
import fcntl

class PipeReader(AbstractPollTask):

	"""
	Reads output from one or more files and saves it in memory,
	for retrieval via the getvalue() method. This is driven by
	the scheduler's poll() loop, so it runs entirely within the
	current process.
	"""

	__slots__ = ("input_files",) + \
		("_read_data", "_reg_ids", "_use_array")

	def _start(self):
		self._reg_ids = set()
		self._read_data = []

		if self._use_array:
			output_handler = self._array_output_handler
		else:
			output_handler = self._output_handler

		fcntl_flags = os.O_NONBLOCK
		try:
			fcntl.FD_CLOEXEC
		except AttributeError:
			pass
		else:
			fcntl_flags |= fcntl.FD_CLOEXEC

		for f in self.input_files.values():
			fd = isinstance(f, int) and f or f.fileno()
			fcntl.fcntl(fd, fcntl.F_SETFL,
				fcntl.fcntl(fd, fcntl.F_GETFL) | fcntl_flags)
			self._reg_ids.add(self.scheduler.io_add_watch(fd,
				self._registered_events, output_handler))
		self._registered = True

	def _cancel(self):
		self._unregister()
		if self.returncode is None:
			self.returncode = self._cancelled_returncode

	def _wait(self):
		if self.returncode is not None:
			return self.returncode
		self._wait_loop()
		self.returncode = os.EX_OK
		return self.returncode

	def getvalue(self):
		"""Retrieve the entire contents"""
		return b''.join(self._read_data)

	def close(self):
		"""Free the memory buffer."""
		self._read_data = None

	def _output_handler(self, fd, event):

		while True:
			data = self._read_buf(fd, event)
			if data is None:
				break
			if data:
				self._read_data.append(data)
			else:
				self._unregister()
				self.wait()
				break

		self._unregister_if_appropriate(event)

		return True

	def _array_output_handler(self, fd, event):

		for f in self.input_files.values():
			if f.fileno() == fd:
				break

		while True:
			data = self._read_array(f, event)
			if data is None:
				break
			if data:
				self._read_data.append(data)
			else:
				self._unregister()
				self.wait()
				break

		self._unregister_if_appropriate(event)

		return True

	def _unregister(self):
		"""
		Unregister from the scheduler and close open files.
		"""

		self._registered = False

		if self._reg_ids is not None:
			for reg_id in self._reg_ids:
				self.scheduler.source_remove(reg_id)
			self._reg_ids = None

		if self.input_files is not None:
			for f in self.input_files.values():
				if isinstance(f, int):
					os.close(f)
				else:
					f.close()
			self.input_files = None