1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
|
# Copyright 2010-2011 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
import dummy_threading
import fcntl
import sys
try:
import threading
except ImportError:
import dummy_threading as threading
import portage
from portage import os
from portage.exception import TryAgain
from portage.locks import lockfile, unlockfile
from _emerge.AbstractPollTask import AbstractPollTask
from _emerge.AsynchronousTask import AsynchronousTask
from _emerge.PollConstants import PollConstants
from _emerge.SpawnProcess import SpawnProcess
class AsynchronousLock(AsynchronousTask):
"""
This uses the portage.locks module to acquire a lock asynchronously,
using either a thread (if available) or a subprocess.
The default behavior is to use a process instead of a thread, since
there is currently no way to interrupt a thread that is waiting for
a lock (notably, SIGINT doesn't work because python delivers all
signals to the main thread).
"""
__slots__ = ('path', 'scheduler',) + \
('_imp', '_force_async', '_force_dummy', '_force_process', \
'_force_thread', '_waiting')
_use_process_by_default = True
def _start(self):
if not self._force_async:
try:
self._imp = lockfile(self.path,
wantnewlockfile=True, flags=os.O_NONBLOCK)
except TryAgain:
pass
else:
self.returncode = os.EX_OK
self.wait()
return
if self._force_process or \
(not self._force_thread and \
(self._use_process_by_default or threading is dummy_threading)):
self._imp = _LockProcess(path=self.path, scheduler=self.scheduler)
else:
self._imp = _LockThread(path=self.path,
scheduler=self.scheduler,
_force_dummy=self._force_dummy)
self._imp.addExitListener(self._imp_exit)
self._imp.start()
def _imp_exit(self, imp):
# call exit listeners
if not self._waiting:
self.wait()
def _cancel(self):
if self._imp is not None:
self._imp.cancel()
def _wait(self):
if self.returncode is not None:
return self.returncode
self._waiting = True
self.returncode = self._imp.wait()
self._waiting = False
return self.returncode
def unlock(self):
if self._imp is None:
raise AssertionError('not locked')
if isinstance(self._imp, (_LockProcess, _LockThread)):
self._imp.unlock()
else:
unlockfile(self._imp)
self._imp = None
class _LockThread(AbstractPollTask):
"""
This uses the portage.locks module to acquire a lock asynchronously,
using a background thread. After the lock is acquired, the thread
writes to a pipe in order to notify a poll loop running in the main
thread.
If the threading module is unavailable then the dummy_threading
module will be used, and the lock will be acquired synchronously
(before the start() method returns).
"""
__slots__ = ('path',) + \
('_files', '_force_dummy', '_lock_obj',
'_thread', '_reg_id',)
def _start(self):
pr, pw = os.pipe()
self._files = {}
self._files['pipe_read'] = os.fdopen(pr, 'rb', 0)
self._files['pipe_write'] = os.fdopen(pw, 'wb', 0)
for k, f in self._files.items():
fcntl.fcntl(f.fileno(), fcntl.F_SETFL,
fcntl.fcntl(f.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK)
self._reg_id = self.scheduler.register(self._files['pipe_read'].fileno(),
PollConstants.POLLIN, self._output_handler)
self._registered = True
threading_mod = threading
if self._force_dummy:
threading_mod = dummy_threading
self._thread = threading_mod.Thread(target=self._run_lock)
self._thread.start()
def _run_lock(self):
self._lock_obj = lockfile(self.path, wantnewlockfile=True)
self._files['pipe_write'].write(b'\0')
def _output_handler(self, f, event):
buf = self._read_buf(self._files['pipe_read'], event)
if buf:
self._unregister()
self.returncode = os.EX_OK
self.wait()
def _cancel(self):
# There's currently no way to force thread termination.
pass
def _wait(self):
if self.returncode is not None:
return self.returncode
if self._registered:
self.scheduler.schedule(self._reg_id)
return self.returncode
def unlock(self):
if self._lock_obj is None:
raise AssertionError('not locked')
if self.returncode is None:
raise AssertionError('lock not acquired yet')
unlockfile(self._lock_obj)
self._lock_obj = None
def _unregister(self):
self._registered = False
if self._thread is not None:
self._thread.join()
self._thread = None
if self._reg_id is not None:
self.scheduler.unregister(self._reg_id)
self._reg_id = None
if self._files is not None:
for f in self._files.values():
f.close()
self._files = None
class _LockProcess(AbstractPollTask):
"""
This uses the portage.locks module to acquire a lock asynchronously,
using a subprocess. After the lock is acquired, the process
writes to a pipe in order to notify a poll loop running in the main
process. The unlock() method notifies the subprocess to release the
lock and exit.
"""
__slots__ = ('path', 'scheduler',) + \
('_proc', '_files', '_reg_id')
def _start(self):
in_pr, in_pw = os.pipe()
out_pr, out_pw = os.pipe()
self._files = {}
self._files['pipe_in'] = os.fdopen(in_pr, 'rb', 0)
self._files['pipe_out'] = os.fdopen(out_pw, 'wb', 0)
fcntl.fcntl(in_pr, fcntl.F_SETFL,
fcntl.fcntl(in_pr, fcntl.F_GETFL) | os.O_NONBLOCK)
self._reg_id = self.scheduler.register(in_pr,
PollConstants.POLLIN, self._output_handler)
self._registered = True
self._proc = SpawnProcess(
args=[portage._python_interpreter,
os.path.join(portage._bin_path, 'lock-helper.py'), self.path],
env=dict(os.environ, PORTAGE_PYM_PATH=portage._pym_path),
fd_pipes={0:out_pr, 1:in_pw, 2:sys.stderr.fileno()},
scheduler=self.scheduler)
self._proc.addExitListener(self._proc_exit)
self._proc.start()
os.close(out_pr)
os.close(in_pw)
def _proc_exit(self, proc):
if proc.returncode != os.EX_OK:
# There's no good reason for locks to fail.
raise AssertionError('lock process failed with returncode %s' \
% (proc.returncode,))
def _cancel(self):
if self._proc is not None:
self._proc.cancel()
def _wait(self):
if self.returncode is not None:
return self.returncode
if self._registered:
self.scheduler.schedule(self._reg_id)
return self.returncode
def _output_handler(self, f, event):
buf = self._read_buf(self._files['pipe_in'], event)
if buf:
self._unregister()
self.returncode = os.EX_OK
self.wait()
def _unregister(self):
self._registered = False
if self._reg_id is not None:
self.scheduler.unregister(self._reg_id)
self._reg_id = None
if self._files is not None:
try:
pipe_in = self._files.pop('pipe_in')
except KeyError:
pass
else:
pipe_in.close()
def unlock(self):
if self._proc is None:
raise AssertionError('not locked')
if self.returncode is None:
raise AssertionError('lock not acquired yet')
self._files['pipe_out'].write(b'\0')
self._files['pipe_out'].close()
self._files = None
self._proc.wait()
self._proc = None
|