From 5c9bd5c9893e6f852b8dc38c2463f3f7f43122e4 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Wed, 26 Dec 2012 18:31:18 -0800 Subject: EventLoop: thread-safe idle_add and timeout_add This may be useful for using threads to handle blocking IO with Jython, since Jython lacks the fcntl module which is needed for non-blocking IO (see http://bugs.jython.org/issue1074). --- pym/portage/util/_eventloop/EventLoop.py | 165 ++++++++++++++++++------------- 1 file changed, 97 insertions(+), 68 deletions(-) (limited to 'pym/portage/util/_eventloop') diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index 17a468f28..37e600792 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -9,12 +9,23 @@ import select import signal import time +try: + import threading +except ImportError: + import dummy_threading as threading + from portage.util import writemsg_level from ..SlotObject import SlotObject from .PollConstants import PollConstants from .PollSelectAdapter import PollSelectAdapter class EventLoop(object): + """ + An event loop, intended to be compatible with the GLib event loop. + Call the iteration method in order to execute one iteration of the + loop. The idle_add and timeout_add methods serve as thread-safe + means to interact with the loop's thread. + """ supports_multiprocessing = True @@ -44,6 +55,7 @@ class EventLoop(object): @type main: bool """ self._use_signal = main + self._thread_rlock = threading.RLock() self._poll_event_queue = [] self._poll_event_handlers = {} self._poll_event_handler_ids = {} @@ -89,6 +101,14 @@ class EventLoop(object): self._sigchld_src_id = None self._pid = os.getpid() + def _new_source_id(self): + """ + Generate a new source id. This method is thread-safe. + """ + with self._thread_rlock: + self._event_handler_id += 1 + return self._event_handler_id + def _poll(self, timeout=None): """ All poll() calls pass through here. The poll events @@ -199,14 +219,17 @@ class EventLoop(object): return bool(events_handled) def _get_poll_timeout(self): - if self._child_handlers: - if self._timeout_interval is None: - timeout = self._sigchld_interval + + with self._thread_rlock: + if self._child_handlers: + if self._timeout_interval is None: + timeout = self._sigchld_interval + else: + timeout = min(self._sigchld_interval, + self._timeout_interval) else: - timeout = min(self._sigchld_interval, - self._timeout_interval) - else: - timeout = self._timeout_interval + timeout = self._timeout_interval + return timeout def child_watch_add(self, pid, callback, data=None): @@ -229,8 +252,7 @@ class EventLoop(object): @rtype: int @return: an integer ID """ - self._event_handler_id += 1 - source_id = self._event_handler_id + source_id = self._new_source_id() self._child_handlers[source_id] = self._child_callback_class( callback=callback, data=data, pid=pid, source_id=source_id) @@ -304,20 +326,21 @@ class EventLoop(object): """ Like glib.idle_add(), if callback returns False it is automatically removed from the list of event sources and will - not be called again. + not be called again. This method is thread-safe. @type callback: callable @param callback: a function to call @rtype: int @return: an integer ID """ - self._event_handler_id += 1 - source_id = self._event_handler_id - self._idle_callbacks[source_id] = self._idle_callback_class( - args=args, callback=callback, source_id=source_id) + with self._thread_rlock: + source_id = self._new_source_id() + self._idle_callbacks[source_id] = self._idle_callback_class( + args=args, callback=callback, source_id=source_id) return source_id def _run_idle_callbacks(self): + # assumes caller has acquired self._thread_rlock if not self._idle_callbacks: return # Iterate of our local list, since self._idle_callbacks can be @@ -342,16 +365,18 @@ class EventLoop(object): milliseconds between calls to your function, and your function should return False to stop being called, or True to continue being called. Any additional positional arguments given here - are passed to your function when it's called. + are passed to your function when it's called. This method is + thread-safe. """ - self._event_handler_id += 1 - source_id = self._event_handler_id - self._timeout_handlers[source_id] = \ - self._timeout_handler_class( - interval=interval, function=function, args=args, - source_id=source_id, timestamp=time.time()) - if self._timeout_interval is None or self._timeout_interval > interval: - self._timeout_interval = interval + with self._thread_rlock: + source_id = self._new_source_id() + self._timeout_handlers[source_id] = \ + self._timeout_handler_class( + interval=interval, function=function, args=args, + source_id=source_id, timestamp=time.time()) + if self._timeout_interval is None or \ + self._timeout_interval > interval: + self._timeout_interval = interval return source_id def _run_timeouts(self): @@ -361,37 +386,39 @@ class EventLoop(object): if self._poll_child_processes(): calls += 1 - self._run_idle_callbacks() - - if not self._timeout_handlers: - return bool(calls) - - ready_timeouts = [] - current_time = time.time() - for x in self._timeout_handlers.values(): - elapsed_seconds = current_time - x.timestamp - # elapsed_seconds < 0 means the system clock has been adjusted - if elapsed_seconds < 0 or \ - (x.interval - 1000 * elapsed_seconds) <= 0: - ready_timeouts.append(x) - - # Iterate of our local list, since self._timeout_handlers can be - # modified during the exection of these callbacks. - for x in ready_timeouts: - if x.source_id not in self._timeout_handlers: - # it got cancelled while executing another timeout - continue - if x.calling: - # don't call it recursively - continue - calls += 1 - x.calling = True - try: - x.timestamp = time.time() - if not x.function(*x.args): - self.source_remove(x.source_id) - finally: - x.calling = False + with self._thread_rlock: + + self._run_idle_callbacks() + + if not self._timeout_handlers: + return bool(calls) + + ready_timeouts = [] + current_time = time.time() + for x in self._timeout_handlers.values(): + elapsed_seconds = current_time - x.timestamp + # elapsed_seconds < 0 means the system clock has been adjusted + if elapsed_seconds < 0 or \ + (x.interval - 1000 * elapsed_seconds) <= 0: + ready_timeouts.append(x) + + # Iterate of our local list, since self._timeout_handlers can be + # modified during the exection of these callbacks. + for x in ready_timeouts: + if x.source_id not in self._timeout_handlers: + # it got cancelled while executing another timeout + continue + if x.calling: + # don't call it recursively + continue + calls += 1 + x.calling = True + try: + x.timestamp = time.time() + if not x.function(*x.args): + self.source_remove(x.source_id) + finally: + x.calling = False return bool(calls) @@ -413,8 +440,7 @@ class EventLoop(object): """ if f in self._poll_event_handlers: raise AssertionError("fd %d is already registered" % f) - self._event_handler_id += 1 - source_id = self._event_handler_id + source_id = self._new_source_id() self._poll_event_handler_ids[source_id] = f self._poll_event_handlers[f] = self._io_handler_class( args=args, callback=callback, f=f, source_id=source_id) @@ -434,18 +460,21 @@ class EventLoop(object): self.source_remove(self._sigchld_src_id) self._sigchld_src_id = None return True - idle_callback = self._idle_callbacks.pop(reg_id, None) - if idle_callback is not None: - return True - timeout_handler = self._timeout_handlers.pop(reg_id, None) - if timeout_handler is not None: - if timeout_handler.interval == self._timeout_interval: - if self._timeout_handlers: - self._timeout_interval = \ - min(x.interval for x in self._timeout_handlers.values()) - else: - self._timeout_interval = None - return True + + with self._thread_rlock: + idle_callback = self._idle_callbacks.pop(reg_id, None) + if idle_callback is not None: + return True + timeout_handler = self._timeout_handlers.pop(reg_id, None) + if timeout_handler is not None: + if timeout_handler.interval == self._timeout_interval: + if self._timeout_handlers: + self._timeout_interval = min(x.interval + for x in self._timeout_handlers.values()) + else: + self._timeout_interval = None + return True + f = self._poll_event_handler_ids.pop(reg_id, None) if f is None: return False -- cgit v1.2.3-1-g7c22