From b41e88203cb308eb79f15961a0508b86209e2dfb Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Wed, 8 Feb 2012 12:24:22 -0800 Subject: PollScheduler: split out EventLoop base class --- pym/_emerge/PollScheduler.py | 222 ++++++++++++++++++++++--------------------- 1 file changed, 116 insertions(+), 106 deletions(-) (limited to 'pym/_emerge') diff --git a/pym/_emerge/PollScheduler.py b/pym/_emerge/PollScheduler.py index 0e510b37b..983bfa9cc 100644 --- a/pym/_emerge/PollScheduler.py +++ b/pym/_emerge/PollScheduler.py @@ -21,12 +21,8 @@ from _emerge.getloadavg import getloadavg from _emerge.PollConstants import PollConstants from _emerge.PollSelectAdapter import PollSelectAdapter -class PollScheduler(object): - class _sched_iface_class(SlotObject): - __slots__ = ("idle_add", "io_add_watch", "iteration", - "output", "register", "schedule", - "source_remove", "timeout_add", "unregister") +class EventLoop(object): class _idle_callback_class(SlotObject): __slots__ = ("args", "callback", "source_id") @@ -39,11 +35,6 @@ class PollScheduler(object): "timestamp") def __init__(self): - self._terminated = threading.Event() - self._terminated_tasks = False - self._max_jobs = 1 - self._max_load = None - self._jobs = 0 self._poll_event_queue = [] self._poll_event_handlers = {} self._poll_event_handler_ids = {} @@ -53,105 +44,11 @@ class PollScheduler(object): self._timeout_handlers = {} self._timeout_interval = None self._poll_obj = create_poll_instance() - self._polling = False - self._scheduling = False - self._background = False - self.sched_iface = self._sched_iface_class( - idle_add=self._idle_add, - io_add_watch=self._register, - iteration=self._iteration, - output=self._task_output, - register=self._register, - schedule=self._poll_loop, - source_remove=self._unregister, - timeout_add=self._timeout_add, - unregister=self._unregister) - - def terminate(self): - """ - Schedules asynchronous, graceful termination of the scheduler - at the earliest opportunity. - - This method is thread-safe (and safe for signal handlers). - """ - self._terminated.set() - - def _terminate_tasks(self): - """ - Send signals to terminate all tasks. This is called once - from self._schedule() in the event dispatching thread. This - prevents it from being called while the _schedule_tasks() - implementation is running, in order to avoid potential - interference. All tasks should be cleaned up at the earliest - opportunity, but not necessarily before this method returns. - """ - raise NotImplementedError() - - def _schedule_tasks(self): - """ - This is called from inside the _schedule() method, which - guarantees the following: - - 1) It will not be called recursively. - 2) _terminate_tasks() will not be called while it is running. - 3) The state of the boolean _terminated_tasks variable will - not change while it is running. - - Unless this method is used to perform user interface updates, - or something like that, the first thing it should do is check - the state of _terminated_tasks and if that is True then it - should return False immediately (since there's no need to - schedule anything after _terminate_tasks() has been called). - """ - pass + self._polling = False def _schedule(self): - """ - Calls _schedule_tasks() and automatically returns early from - any recursive calls to this method that the _schedule_tasks() - call might trigger. This makes _schedule() safe to call from - inside exit listeners. - """ - if self._scheduling: - return False - self._scheduling = True - try: - - if self._terminated.is_set() and \ - not self._terminated_tasks: - self._terminated_tasks = True - self._terminate_tasks() - - return self._schedule_tasks() - finally: - self._scheduling = False - - def _running_job_count(self): - return self._jobs - - def _can_add_job(self): - if self._terminated_tasks: - return False - - max_jobs = self._max_jobs - max_load = self._max_load - - if self._max_jobs is not True and \ - self._running_job_count() >= self._max_jobs: - return False - - if max_load is not None and \ - (max_jobs is True or max_jobs > 1) and \ - self._running_job_count() >= 1: - try: - avg1, avg5, avg15 = getloadavg() - except OSError: - return False - - if avg1 >= max_load: - return False + pass - return True def _poll(self, timeout=None): if self._polling: @@ -462,6 +359,119 @@ class PollScheduler(object): del self._poll_event_handlers[f] return True +class PollScheduler(EventLoop): + + class _sched_iface_class(SlotObject): + __slots__ = ("idle_add", "io_add_watch", "iteration", + "output", "register", "schedule", + "source_remove", "timeout_add", "unregister") + + def __init__(self): + super(PollScheduler, self).__init__() + self._terminated = threading.Event() + self._terminated_tasks = False + self._max_jobs = 1 + self._max_load = None + self._jobs = 0 + self._scheduling = False + self._background = False + self.sched_iface = self._sched_iface_class( + idle_add=self._idle_add, + io_add_watch=self._register, + iteration=self._iteration, + output=self._task_output, + register=self._register, + schedule=self._poll_loop, + source_remove=self._unregister, + timeout_add=self._timeout_add, + unregister=self._unregister) + + def terminate(self): + """ + Schedules asynchronous, graceful termination of the scheduler + at the earliest opportunity. + + This method is thread-safe (and safe for signal handlers). + """ + self._terminated.set() + + def _terminate_tasks(self): + """ + Send signals to terminate all tasks. This is called once + from self._schedule() in the event dispatching thread. This + prevents it from being called while the _schedule_tasks() + implementation is running, in order to avoid potential + interference. All tasks should be cleaned up at the earliest + opportunity, but not necessarily before this method returns. + """ + raise NotImplementedError() + + def _schedule_tasks(self): + """ + This is called from inside the _schedule() method, which + guarantees the following: + + 1) It will not be called recursively. + 2) _terminate_tasks() will not be called while it is running. + 3) The state of the boolean _terminated_tasks variable will + not change while it is running. + + Unless this method is used to perform user interface updates, + or something like that, the first thing it should do is check + the state of _terminated_tasks and if that is True then it + should return False immediately (since there's no need to + schedule anything after _terminate_tasks() has been called). + """ + pass + + def _schedule(self): + """ + Calls _schedule_tasks() and automatically returns early from + any recursive calls to this method that the _schedule_tasks() + call might trigger. This makes _schedule() safe to call from + inside exit listeners. + """ + if self._scheduling: + return False + self._scheduling = True + try: + + if self._terminated.is_set() and \ + not self._terminated_tasks: + self._terminated_tasks = True + self._terminate_tasks() + + return self._schedule_tasks() + finally: + self._scheduling = False + + def _running_job_count(self): + return self._jobs + + def _can_add_job(self): + if self._terminated_tasks: + return False + + max_jobs = self._max_jobs + max_load = self._max_load + + if self._max_jobs is not True and \ + self._running_job_count() >= self._max_jobs: + return False + + if max_load is not None and \ + (max_jobs is True or max_jobs > 1) and \ + self._running_job_count() >= 1: + try: + avg1, avg5, avg15 = getloadavg() + except OSError: + return False + + if avg1 >= max_load: + return False + + return True + def _task_output(self, msg, log_path=None, background=None, level=0, noiselevel=-1): """ -- cgit v1.2.3-1-g7c22