1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
|
# Copyright 1999-2012 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2
import gzip
import errno
try:
import threading
except ImportError:
import dummy_threading as threading
from portage import _encodings
from portage import _unicode_encode
from portage.util import writemsg_level
from portage.util._eventloop.EventLoop import EventLoop
from _emerge.SlotObject import SlotObject
from _emerge.getloadavg import getloadavg
class PollScheduler(object):
class _sched_iface_class(SlotObject):
__slots__ = ("idle_add", "io_add_watch", "iteration",
"output", "register", "run",
"source_remove", "timeout_add", "unregister")
def __init__(self):
self._terminated = threading.Event()
self._terminated_tasks = False
self._max_jobs = 1
self._max_load = None
self._jobs = 0
self._scheduling = False
self._background = False
self._event_loop = EventLoop()
self.sched_iface = self._sched_iface_class(
idle_add=self._event_loop.idle_add,
io_add_watch=self._event_loop.io_add_watch,
iteration=self._event_loop.iteration,
output=self._task_output,
register=self._event_loop.io_add_watch,
source_remove=self._event_loop.source_remove,
timeout_add=self._event_loop.timeout_add,
unregister=self._event_loop.source_remove)
def terminate(self):
"""
Schedules asynchronous, graceful termination of the scheduler
at the earliest opportunity.
This method is thread-safe (and safe for signal handlers).
"""
self._terminated.set()
def _terminate_tasks(self):
"""
Send signals to terminate all tasks. This is called once
from self._schedule() in the event dispatching thread. This
prevents it from being called while the _schedule_tasks()
implementation is running, in order to avoid potential
interference. All tasks should be cleaned up at the earliest
opportunity, but not necessarily before this method returns.
"""
raise NotImplementedError()
def _schedule_tasks(self):
"""
This is called from inside the _schedule() method, which
guarantees the following:
1) It will not be called recursively.
2) _terminate_tasks() will not be called while it is running.
3) The state of the boolean _terminated_tasks variable will
not change while it is running.
Unless this method is used to perform user interface updates,
or something like that, the first thing it should do is check
the state of _terminated_tasks and if that is True then it
should return False immediately (since there's no need to
schedule anything after _terminate_tasks() has been called).
"""
pass
def _schedule(self):
"""
Calls _schedule_tasks() and automatically returns early from
any recursive calls to this method that the _schedule_tasks()
call might trigger. This makes _schedule() safe to call from
inside exit listeners.
"""
if self._scheduling:
return False
self._scheduling = True
try:
if self._terminated.is_set() and \
not self._terminated_tasks:
self._terminated_tasks = True
self._terminate_tasks()
return self._schedule_tasks()
finally:
self._scheduling = False
def _running_job_count(self):
return self._jobs
def _can_add_job(self):
if self._terminated_tasks:
return False
max_jobs = self._max_jobs
max_load = self._max_load
if self._max_jobs is not True and \
self._running_job_count() >= self._max_jobs:
return False
if max_load is not None and \
(max_jobs is True or max_jobs > 1) and \
self._running_job_count() >= 1:
try:
avg1, avg5, avg15 = getloadavg()
except OSError:
return False
if avg1 >= max_load:
return False
return True
def _task_output(self, msg, log_path=None, background=None,
level=0, noiselevel=-1):
"""
Output msg to stdout if not self._background. If log_path
is not None then append msg to the log (appends with
compression if the filename extension of log_path
corresponds to a supported compression type).
"""
if background is None:
# If the task does not have a local background value
# (like for parallel-fetch), then use the global value.
background = self._background
msg_shown = False
if not background:
writemsg_level(msg, level=level, noiselevel=noiselevel)
msg_shown = True
if log_path is not None:
try:
f = open(_unicode_encode(log_path,
encoding=_encodings['fs'], errors='strict'),
mode='ab')
f_real = f
except IOError as e:
if e.errno not in (errno.ENOENT, errno.ESTALE):
raise
if not msg_shown:
writemsg_level(msg, level=level, noiselevel=noiselevel)
else:
if log_path.endswith('.gz'):
# NOTE: The empty filename argument prevents us from
# triggering a bug in python3 which causes GzipFile
# to raise AttributeError if fileobj.name is bytes
# instead of unicode.
f = gzip.GzipFile(filename='', mode='ab', fileobj=f)
f.write(_unicode_encode(msg))
f.close()
if f_real is not f:
f_real.close()
|