From 0166ba48b976f7e7262f59df0b3d0b499d0f2a63 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Thu, 16 Feb 2012 12:56:50 -0800 Subject: EventLoop: implement child_watch_add --- pym/_emerge/PollScheduler.py | 4 +- pym/portage/util/_eventloop/EventLoop.py | 95 ++++++++++++++++++++++++++++ pym/portage/util/_eventloop/GlibEventLoop.py | 1 + 3 files changed, 99 insertions(+), 1 deletion(-) diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py index 6e416c300..be41565dd 100644 --- a/pym/_emerge/PollScheduler.py +++ b/pym/_emerge/PollScheduler.py @@ -21,7 +21,8 @@ class PollScheduler(object): class _sched_iface_class(SlotObject): __slots__ = ("IO_ERR", "IO_HUP", "IO_IN", "IO_NVAL", "IO_OUT", - "IO_PRI", "idle_add", "io_add_watch", "iteration", + "IO_PRI", "child_watch_add", + "idle_add", "io_add_watch", "iteration", "output", "register", "run", "source_remove", "timeout_add", "unregister") @@ -41,6 +42,7 @@ class PollScheduler(object): IO_NVAL=self._event_loop.IO_NVAL, IO_OUT=self._event_loop.IO_OUT, IO_PRI=self._event_loop.IO_PRI, + child_watch_add=self._event_loop.child_watch_add, idle_add=self._event_loop.idle_add, io_add_watch=self._event_loop.io_add_watch, iteration=self._event_loop.iteration, diff --git a/pym/portage/util/_eventloop/EventLoop.py b/pym/portage/util/_eventloop/EventLoop.py index 24ba14077..994952e19 100644 --- a/pym/portage/util/_eventloop/EventLoop.py +++ b/pym/portage/util/_eventloop/EventLoop.py @@ -2,8 +2,11 @@ # Distributed under the terms of the GNU General Public License v2 import errno +import fcntl import logging +import os import select +import signal import time from portage.util import writemsg_level @@ -16,6 +19,9 @@ class EventLoop(object): supports_multiprocessing = True + class _child_callback_class(SlotObject): + __slots__ = ("callback", "data", "pid", "source_id") + class _idle_callback_class(SlotObject): __slots__ = ("args", "callback", "calling", "source_id") @@ -44,6 +50,11 @@ class EventLoop(object): self.IO_OUT = PollConstants.POLLOUT self.IO_PRI = PollConstants.POLLPRI + self._child_handlers = {} + self._sigchld_read = None + self._sigchld_write = None + self._pid = os.getpid() + def _poll(self, timeout=None): if self._timeout_interval is None: self._run_timeouts() @@ -200,6 +211,87 @@ class EventLoop(object): return bool(events_handled) + def child_watch_add(self, pid, callback, data=None): + """ + Like glib.child_watch_add(), sets callback to be called with the + user data specified by data when the child indicated by pid exits. + The signature for the callback is: + + def callback(pid, condition, user_data) + + where pid is is the child process id, condition is the status + information about the child process and user_data is data. + + @type int + @param pid: process id of a child process to watch + @type callback: callable + @param callback: a function to call + @type data: object + @param data: the optional data to pass to function + @rtype: int + @return: an integer ID + """ + self._event_handler_id += 1 + source_id = self._event_handler_id + self._child_handlers[source_id] = self._child_callback_class( + callback=callback, data=data, pid=pid, source_id=source_id) + if self._sigchld_read is None: + self._sigchld_init() + # poll now, in case the SIGCHLD has already arrived + self._poll_child_processes() + return source_id + + def _sigchld_init(self): + signal.signal(signal.SIGCHLD, self._sigchld_sig_cb) + self._sigchld_read, self._sigchld_write = os.pipe() + fcntl.fcntl(self._sigchld_read, fcntl.F_SETFL, + fcntl.fcntl(self._sigchld_read, fcntl.F_GETFL) | os.O_NONBLOCK) + self.io_add_watch(self._sigchld_read, self.IO_IN, self._sigchld_io_cb) + + def _sigchld_sig_cb(self, signum, frame): + # If this signal handler was not installed by the + # current process then the signal doesn't belong to + # this EventLoop instance. + if os.getpid() == self._pid: + os.write(self._sigchld_write, b'\0') + + def _sigchld_io_cb(self, fd, events): + try: + while True: + os.read(self._sigchld_read, 4096) + except OSError: + # read until EAGAIN + pass + self._poll_child_processes() + + def _poll_child_processes(self): + if not self._child_handlers: + return False + + calls = 0 + + for x in list(self._child_handlers.values()): + if x.source_id not in self._child_handlers: + # it's already been called via re-entrance + continue + try: + wait_retval = os.waitpid(x.pid, os.WNOHANG) + except OSError as e: + if e.errno != errno.ECHILD: + raise + del e + self._child_handlers.pop(x.source_id, None) + else: + # With waitpid and WNOHANG, only check the + # first element of the tuple since the second + # element may vary (bug #337465). + if wait_retval[0] != 0: + calls += 1 + self._child_handlers.pop(x.source_id, None) + x.callback(x.pid, wait_retval[1], x.data) + + return bool(calls) + def idle_add(self, callback, *args): """ Like glib.idle_add(), if callback returns False it is @@ -323,6 +415,9 @@ class EventLoop(object): is found and removed, and False if the reg_id is invalid or has already been removed. """ + x = self._child_handlers.pop(reg_id, None) + if x is not None: + return True idle_callback = self._idle_callbacks.pop(reg_id, None) if idle_callback is not None: return True diff --git a/pym/portage/util/_eventloop/GlibEventLoop.py b/pym/portage/util/_eventloop/GlibEventLoop.py index 0d33175ea..f2f5c5e64 100644 --- a/pym/portage/util/_eventloop/GlibEventLoop.py +++ b/pym/portage/util/_eventloop/GlibEventLoop.py @@ -16,6 +16,7 @@ class GlibEventLoop(object): self.IO_OUT = glib.IO_OUT self.IO_PRI = glib.IO_PRI self.iteration = glib.main_context_default().iteration + self.child_watch_add = glib.child_watch_add self.idle_add = glib.idle_add self.io_add_watch = glib.io_add_watch self.timeout_add = glib.timeout_add -- cgit v1.2.3-1-g7c22