summaryrefslogtreecommitdiffstats
path: root/pym/portage
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2012-12-26 18:31:18 -0800
committerZac Medico <zmedico@gentoo.org>2012-12-26 18:39:20 -0800
commit5c9bd5c9893e6f852b8dc38c2463f3f7f43122e4 (patch)
tree0c09c3e21c47b450d0389d0b868c6eb8e80a1209 /pym/portage
parentd93959f2f295b47a0b17c33ee8df1633a0e479ee (diff)
downloadportage-5c9bd5c9893e6f852b8dc38c2463f3f7f43122e4.tar.gz
portage-5c9bd5c9893e6f852b8dc38c2463f3f7f43122e4.tar.bz2
portage-5c9bd5c9893e6f852b8dc38c2463f3f7f43122e4.zip
EventLoop: thread-safe idle_add and timeout_add
This may be useful for using threads to handle blocking IO with Jython, since Jython lacks the fcntl module which is needed for non-blocking IO (see http://bugs.jython.org/issue1074).
Diffstat (limited to 'pym/portage')
-rw-r--r--pym/portage/util/_eventloop/EventLoop.py165
1 files changed, 97 insertions, 68 deletions
diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py
index 17a468f28..37e600792 100644
--- a/pym/portage/util/_eventloop/EventLoop.py
+++ b/pym/portage/util/_eventloop/EventLoop.py
@@ -9,12 +9,23 @@ import select
import signal
import time
+try:
+ import threading
+except ImportError:
+ import dummy_threading as threading
+
from portage.util import writemsg_level
from ..SlotObject import SlotObject
from .PollConstants import PollConstants
from .PollSelectAdapter import PollSelectAdapter
class EventLoop(object):
+ """
+ An event loop, intended to be compatible with the GLib event loop.
+ Call the iteration method in order to execute one iteration of the
+ loop. The idle_add and timeout_add methods serve as thread-safe
+ means to interact with the loop's thread.
+ """
supports_multiprocessing = True
@@ -44,6 +55,7 @@ class EventLoop(object):
@type main: bool
"""
self._use_signal = main
+ self._thread_rlock = threading.RLock()
self._poll_event_queue = []
self._poll_event_handlers = {}
self._poll_event_handler_ids = {}
@@ -89,6 +101,14 @@ class EventLoop(object):
self._sigchld_src_id = None
self._pid = os.getpid()
+ def _new_source_id(self):
+ """
+ Generate a new source id. This method is thread-safe.
+ """
+ with self._thread_rlock:
+ self._event_handler_id += 1
+ return self._event_handler_id
+
def _poll(self, timeout=None):
"""
All poll() calls pass through here. The poll events
@@ -199,14 +219,17 @@ class EventLoop(object):
return bool(events_handled)
def _get_poll_timeout(self):
- if self._child_handlers:
- if self._timeout_interval is None:
- timeout = self._sigchld_interval
+
+ with self._thread_rlock:
+ if self._child_handlers:
+ if self._timeout_interval is None:
+ timeout = self._sigchld_interval
+ else:
+ timeout = min(self._sigchld_interval,
+ self._timeout_interval)
else:
- timeout = min(self._sigchld_interval,
- self._timeout_interval)
- else:
- timeout = self._timeout_interval
+ timeout = self._timeout_interval
+
return timeout
def child_watch_add(self, pid, callback, data=None):
@@ -229,8 +252,7 @@ class EventLoop(object):
@rtype: int
@return: an integer ID
"""
- self._event_handler_id += 1
- source_id = self._event_handler_id
+ source_id = self._new_source_id()
self._child_handlers[source_id] = self._child_callback_class(
callback=callback, data=data, pid=pid, source_id=source_id)
@@ -304,20 +326,21 @@ class EventLoop(object):
"""
Like glib.idle_add(), if callback returns False it is
automatically removed from the list of event sources and will
- not be called again.
+ not be called again. This method is thread-safe.
@type callback: callable
@param callback: a function to call
@rtype: int
@return: an integer ID
"""
- self._event_handler_id += 1
- source_id = self._event_handler_id
- self._idle_callbacks[source_id] = self._idle_callback_class(
- args=args, callback=callback, source_id=source_id)
+ with self._thread_rlock:
+ source_id = self._new_source_id()
+ self._idle_callbacks[source_id] = self._idle_callback_class(
+ args=args, callback=callback, source_id=source_id)
return source_id
def _run_idle_callbacks(self):
+ # assumes caller has acquired self._thread_rlock
if not self._idle_callbacks:
return
# Iterate of our local list, since self._idle_callbacks can be
@@ -342,16 +365,18 @@ class EventLoop(object):
milliseconds between calls to your function, and your function
should return False to stop being called, or True to continue
being called. Any additional positional arguments given here
- are passed to your function when it's called.
+ are passed to your function when it's called. This method is
+ thread-safe.
"""
- self._event_handler_id += 1
- source_id = self._event_handler_id
- self._timeout_handlers[source_id] = \
- self._timeout_handler_class(
- interval=interval, function=function, args=args,
- source_id=source_id, timestamp=time.time())
- if self._timeout_interval is None or self._timeout_interval > interval:
- self._timeout_interval = interval
+ with self._thread_rlock:
+ source_id = self._new_source_id()
+ self._timeout_handlers[source_id] = \
+ self._timeout_handler_class(
+ interval=interval, function=function, args=args,
+ source_id=source_id, timestamp=time.time())
+ if self._timeout_interval is None or \
+ self._timeout_interval > interval:
+ self._timeout_interval = interval
return source_id
def _run_timeouts(self):
@@ -361,37 +386,39 @@ class EventLoop(object):
if self._poll_child_processes():
calls += 1
- self._run_idle_callbacks()
-
- if not self._timeout_handlers:
- return bool(calls)
-
- ready_timeouts = []
- current_time = time.time()
- for x in self._timeout_handlers.values():
- elapsed_seconds = current_time - x.timestamp
- # elapsed_seconds < 0 means the system clock has been adjusted
- if elapsed_seconds < 0 or \
- (x.interval - 1000 * elapsed_seconds) <= 0:
- ready_timeouts.append(x)
-
- # Iterate of our local list, since self._timeout_handlers can be
- # modified during the exection of these callbacks.
- for x in ready_timeouts:
- if x.source_id not in self._timeout_handlers:
- # it got cancelled while executing another timeout
- continue
- if x.calling:
- # don't call it recursively
- continue
- calls += 1
- x.calling = True
- try:
- x.timestamp = time.time()
- if not x.function(*x.args):
- self.source_remove(x.source_id)
- finally:
- x.calling = False
+ with self._thread_rlock:
+
+ self._run_idle_callbacks()
+
+ if not self._timeout_handlers:
+ return bool(calls)
+
+ ready_timeouts = []
+ current_time = time.time()
+ for x in self._timeout_handlers.values():
+ elapsed_seconds = current_time - x.timestamp
+ # elapsed_seconds < 0 means the system clock has been adjusted
+ if elapsed_seconds < 0 or \
+ (x.interval - 1000 * elapsed_seconds) <= 0:
+ ready_timeouts.append(x)
+
+ # Iterate of our local list, since self._timeout_handlers can be
+ # modified during the exection of these callbacks.
+ for x in ready_timeouts:
+ if x.source_id not in self._timeout_handlers:
+ # it got cancelled while executing another timeout
+ continue
+ if x.calling:
+ # don't call it recursively
+ continue
+ calls += 1
+ x.calling = True
+ try:
+ x.timestamp = time.time()
+ if not x.function(*x.args):
+ self.source_remove(x.source_id)
+ finally:
+ x.calling = False
return bool(calls)
@@ -413,8 +440,7 @@ class EventLoop(object):
"""
if f in self._poll_event_handlers:
raise AssertionError("fd %d is already registered" % f)
- self._event_handler_id += 1
- source_id = self._event_handler_id
+ source_id = self._new_source_id()
self._poll_event_handler_ids[source_id] = f
self._poll_event_handlers[f] = self._io_handler_class(
args=args, callback=callback, f=f, source_id=source_id)
@@ -434,18 +460,21 @@ class EventLoop(object):
self.source_remove(self._sigchld_src_id)
self._sigchld_src_id = None
return True
- idle_callback = self._idle_callbacks.pop(reg_id, None)
- if idle_callback is not None:
- return True
- timeout_handler = self._timeout_handlers.pop(reg_id, None)
- if timeout_handler is not None:
- if timeout_handler.interval == self._timeout_interval:
- if self._timeout_handlers:
- self._timeout_interval = \
- min(x.interval for x in self._timeout_handlers.values())
- else:
- self._timeout_interval = None
- return True
+
+ with self._thread_rlock:
+ idle_callback = self._idle_callbacks.pop(reg_id, None)
+ if idle_callback is not None:
+ return True
+ timeout_handler = self._timeout_handlers.pop(reg_id, None)
+ if timeout_handler is not None:
+ if timeout_handler.interval == self._timeout_interval:
+ if self._timeout_handlers:
+ self._timeout_interval = min(x.interval
+ for x in self._timeout_handlers.values())
+ else:
+ self._timeout_interval = None
+ return True
+
f = self._poll_event_handler_ids.pop(reg_id, None)
if f is None:
return False