From 6afd0e508eaf1f9040a20ed670cd6cf7a3a07517 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Mon, 13 Feb 2012 17:22:56 -0800 Subject: EventLoop: make _poll/_run_timeouts re-entrant This fixes infinite loops triggered by Ctrl-C, where timeout calls would exhaust the poll event queue because _poll was not re-entrant. Now, re-entrance is only prohibited for individual callback functions, in order to protect against infinite recursion. --- pym/portage/util/_eventloop/EventLoop.py | 135 +++++++++++++++++-------------- 1 file changed, 73 insertions(+), 62 deletions(-) (limited to 'pym/portage/util/_eventloop') diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index 0da388d03..6c6a1b75d 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -16,13 +16,13 @@ class EventLoop(object): supports_multiprocessing = True class _idle_callback_class(SlotObject): - __slots__ = ("args", "callback", "source_id") + __slots__ = ("args", "callback", "calling", "source_id") class _io_handler_class(SlotObject): - __slots__ = ("args", "callback", "fd", "source_id") + __slots__ = ("args", "callback", "calling", "fd", "source_id") class _timeout_handler_class(SlotObject): - __slots__ = ("args", "function", "interval", "source_id", + __slots__ = ("args", "function", "calling", "interval", "source_id", "timestamp") def __init__(self): @@ -35,7 +35,6 @@ class EventLoop(object): self._timeout_handlers = {} self._timeout_interval = None self._poll_obj = create_poll_instance() - self._polling = False self.IO_ERR = PollConstants.POLLERR self.IO_HUP = PollConstants.POLLHUP @@ -45,53 +44,47 @@ class EventLoop(object): self.IO_PRI = PollConstants.POLLPRI def _poll(self, timeout=None): - if self._polling: - return - self._polling = True - try: - if self._timeout_interval is None: + if self._timeout_interval is None: + self._run_timeouts() + self._do_poll(timeout=timeout) + + elif timeout is None: + while True: self._run_timeouts() - self._do_poll(timeout=timeout) + previous_count = len(self._poll_event_queue) + self._do_poll(timeout=self._timeout_interval) + if previous_count != len(self._poll_event_queue): + break - elif timeout is None: - while True: - self._run_timeouts() - previous_count = len(self._poll_event_queue) - self._do_poll(timeout=self._timeout_interval) - if previous_count != len(self._poll_event_queue): - break + elif timeout <= self._timeout_interval: + self._run_timeouts() + self._do_poll(timeout=timeout) - elif timeout <= self._timeout_interval: + else: + remaining_timeout = timeout + start_time = time.time() + while True: self._run_timeouts() - self._do_poll(timeout=timeout) - - else: - remaining_timeout = timeout - start_time = time.time() - while True: - self._run_timeouts() - # _timeout_interval can change each time - # _run_timeouts is called - min_timeout = remaining_timeout - if self._timeout_interval is not None and \ - self._timeout_interval < min_timeout: - min_timeout = self._timeout_interval - - previous_count = len(self._poll_event_queue) - self._do_poll(timeout=min_timeout) - if previous_count != len(self._poll_event_queue): - break - elapsed_time = time.time() - start_time - if elapsed_time < 0: - # The system clock has changed such that start_time - # is now in the future, so just assume that the - # timeout has already elapsed. - break - remaining_timeout = timeout - 1000 * elapsed_time - if remaining_timeout <= 0: - break - finally: - self._polling = False + # _timeout_interval can change each time + # _run_timeouts is called + min_timeout = remaining_timeout + if self._timeout_interval is not None and \ + self._timeout_interval < min_timeout: + min_timeout = self._timeout_interval + + previous_count = len(self._poll_event_queue) + self._do_poll(timeout=min_timeout) + if previous_count != len(self._poll_event_queue): + break + elapsed_time = time.time() - start_time + if elapsed_time < 0: + # The system clock has changed such that start_time + # is now in the future, so just assume that the + # timeout has already elapsed. + break + remaining_timeout = timeout - 1000 * elapsed_time + if remaining_timeout <= 0: + break def _do_poll(self, timeout=None): """ @@ -162,13 +155,8 @@ class EventLoop(object): events_handled = 0 if not event_handlers: - if not self._polling: - self._polling = True - try: - if self._run_timeouts(): - events_handled += 1 - finally: - self._polling = False + if self._run_timeouts(): + events_handled += 1 if not event_handlers: return bool(events_handled) @@ -189,9 +177,16 @@ class EventLoop(object): while event_handlers and self._poll_event_queue: f, event = self._next_poll_event() x = event_handlers[f] - if not x.callback(f, event, *x.args): - self.source_remove(x.source_id) + if x.calling: + # don't call it recursively + continue events_handled += 1 + x.calling = True + try: + if not x.callback(f, event, *x.args): + self.source_remove(x.source_id) + finally: + x.calling = False except StopIteration: events_handled += 1 @@ -223,8 +218,15 @@ class EventLoop(object): if x.source_id not in self._idle_callbacks: # it got cancelled while executing another callback continue - if not x.callback(*x.args): - self.source_remove(x.source_id) + if x.calling: + # don't call it recursively + continue + x.calling = True + try: + if not x.callback(*x.args): + self.source_remove(x.source_id) + finally: + x.calling = False def timeout_add(self, interval, function, *args): """ @@ -262,15 +264,24 @@ class EventLoop(object): # Iterate of our local list, since self._timeout_handlers can be # modified during the exection of these callbacks. + calls = 0 for x in ready_timeouts: if x.source_id not in self._timeout_handlers: # it got cancelled while executing another timeout continue - x.timestamp = time.time() - if not x.function(*x.args): - self.source_remove(x.source_id) + if x.calling: + # don't call it recursively + continue + calls += 1 + x.calling = True + try: + x.timestamp = time.time() + if not x.function(*x.args): + self.source_remove(x.source_id) + finally: + x.calling = False - return bool(ready_timeouts) + return bool(calls) def io_add_watch(self, f, condition, callback, *args): """ -- cgit v1.2.3-1-g7c22