summaryrefslogtreecommitdiffstats
path: root/pym/portage/util/_async/PipeReaderBlockingIO.py
blob: b06adf6edb9943eee6f68328285c41d8c12a07a5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
# Copyright 2012 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2

try:
	import threading
except ImportError:
	# dummy_threading will not suffice
	threading = None

from portage import os
from _emerge.AbstractPollTask import AbstractPollTask

class PipeReaderBlockingIO(AbstractPollTask):
	"""
	Reads output from one or more files and saves it in memory, for
	retrieval via the getvalue() method. This is driven by a thread
	for each input file, in order to support blocking IO.  This may
	be useful for using threads to handle blocking IO with Jython,
	since Jython lacks the fcntl module which is needed for
	non-blocking IO (see http://bugs.jython.org/issue1074).
	"""

	__slots__ = ("input_files", "_read_data", "_terminate",
		"_threads", "_thread_rlock")

	def _start(self):
		self._terminate = threading.Event()
		self._threads = {}
		self._read_data = []

		self._registered = True
		self._thread_rlock = threading.RLock()
		with self._thread_rlock:
			for f in self.input_files.values():
				t = threading.Thread(target=self._reader_thread, args=(f,))
				t.daemon = True
				t.start()
				self._threads[f] = t

	def _reader_thread(self, f):
		try:
			terminated = self._terminate.is_set
		except AttributeError:
			# Jython 2.7.0a2
			terminated = self._terminate.isSet
		bufsize = self._bufsize
		while not terminated():
			buf = f.read(bufsize)
			with self._thread_rlock:
				if terminated():
					break
				elif buf:
					self._read_data.append(buf)
				else:
					del self._threads[f]
					if not self._threads:
						# Thread-safe callback to EventLoop
						self.scheduler.idle_add(self._eof)
					break
		f.close()

	def _eof(self):
		self._registered = False
		if self.returncode is None:
			self.returncode = os.EX_OK
		self.wait()
		return False

	def _cancel(self):
		self._terminate.set()
		self._registered = False
		if self.returncode is None:
			self.returncode = self._cancelled_returncode
		self.wait()

	def _wait(self):
		if self.returncode is not None:
			return self.returncode
		self._wait_loop()
		self.returncode = os.EX_OK
		return self.returncode

	def getvalue(self):
		"""Retrieve the entire contents"""
		with self._thread_rlock:
			return b''.join(self._read_data)

	def close(self):
		"""Free the memory buffer."""
		with self._thread_rlock:
			self._read_data = None