From 49766b22882fc216dcf884fa5c13cc82979f9330 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Fri, 4 Jul 2008 06:15:43 +0000 Subject: Change the way the way things that have to call the scheduler interact with it: * Return a unique integer id from scheduler.register(), to be passed back into other scheduler methods. * Control handler unregistration with the handler's return value, like some other frameworks do for similar callbacks. * Add a SpawnProcess.reg_id attribute to store the id returned from scheduler.register() * Pass the SpawnProcess.reg_id value into scheduler.schedule() calls, so the scheduler knows to return when the callback referred to by the given id unregisters itself by returning False. svn path=/main/trunk/; revision=10921 --- pym/_emerge/__init__.py | 116 +++++++++++++++++++++++++----------------------- 1 file changed, 60 insertions(+), 56 deletions(-) (limited to 'pym') diff --git a/pym/_emerge/__init__.py b/pym/_emerge/__init__.py index b02d347bc..a903e52a6 100644 --- a/pym/_emerge/__init__.py +++ b/pym/_emerge/__init__.py @@ -1507,7 +1507,7 @@ class SpawnProcess(SubProcess): "uid", "gid", "groups", "umask", "logfile", "path_lookup", "pre_exec") - __slots__ = ("args", "files", "register", "unregister", "registered") + \ + __slots__ = ("args", "files", "registered", "reg_id", "scheduler") + \ _spawn_kwarg_names _file_names = ("process", "out") @@ -1573,9 +1573,9 @@ class SpawnProcess(SubProcess): os.close(slave_fd) files.process = os.fdopen(master_fd, 'r') - self.registered = True - self.register(files.process.fileno(), + self.reg_id = self.scheduler.register(files.process.fileno(), select.POLLIN, self._output_handler) + self.registered = True def _output_handler(self, fd, event): files = self.files @@ -1593,7 +1593,7 @@ class SpawnProcess(SubProcess): f.flush() f.close() self.registered = False - self.unregister(fd) + return self.registered class EbuildFetcherAsync(SpawnProcess): @@ -1766,9 +1766,8 @@ class EbuildBuild(SlotObject): (pkg_count.curval, pkg_count.maxval, pkg.cpv) logger.log(msg, short_msg=short_msg) - build = EbuildExecuter(pkg=pkg, register=scheduler.register, - schedule=scheduler.schedule, settings=settings, - unregister=scheduler.unregister) + build = EbuildExecuter(pkg=pkg, scheduler=scheduler, + settings=settings) retval = build.execute() if retval != os.EX_OK: return retval @@ -1804,9 +1803,8 @@ class EbuildBuild(SlotObject): (pkg_count.curval, pkg_count.curval, pkg.cpv) logger.log(msg, short_msg=short_msg) - build = EbuildExecuter(pkg=pkg, register=scheduler.register, - schedule=scheduler.schedule, settings=settings, - unregister=scheduler.unregister) + build = EbuildExecuter(pkg=pkg, scheduler=scheduler, + settings=settings) retval = build.execute() if retval != os.EX_OK: return retval @@ -1827,7 +1825,7 @@ class EbuildBuild(SlotObject): class EbuildExecuter(SlotObject): - __slots__ = ("pkg", "register", "schedule", "settings", "unregister") + __slots__ = ("pkg", "scheduler", "settings") _phases = ("setup", "unpack", "compile", "test", "install") @@ -1857,14 +1855,12 @@ class EbuildExecuter(SlotObject): for mydo in self._phases: ebuild_phase = EbuildPhase(fd_pipes=fd_pipes, - pkg=self.pkg, phase=mydo, register=self.register, - settings=settings, tree=tree, unregister=self.unregister) + pkg=self.pkg, phase=mydo, scheduler=self.scheduler, + settings=settings, tree=tree) ebuild_phase.start() - retval = None - while retval is None: - self.schedule() - retval = ebuild_phase.poll() + self.scheduler.schedule(ebuild_phase.reg_id) + retval = ebuild_phase.wait() if retval != os.EX_OK: return retval @@ -1874,8 +1870,8 @@ class EbuildExecuter(SlotObject): class EbuildPhase(SubProcess): __slots__ = ("fd_pipes", "phase", "pkg", - "register", "settings", "tree", "unregister", - "files", "registered") + "scheduler", "settings", "tree", + "files", "registered", "reg_id") _file_names = ("log", "stdout", "ebuild") _files_dict = slot_dict_class(_file_names, prefix="") @@ -1976,8 +1972,9 @@ class EbuildPhase(SubProcess): os.close(slave_fd) files.ebuild = os.fdopen(master_fd, 'r') + self.reg_id = self.scheduler.register(files.ebuild.fileno(), + select.POLLIN, output_handler) self.registered = True - self.register(files.ebuild.fileno(), select.POLLIN, output_handler) def _output_handler(self, fd, event): files = self.files @@ -1996,7 +1993,7 @@ class EbuildPhase(SubProcess): for f in files.values(): f.close() self.registered = False - self.unregister(fd) + return self.registered def _dummy_handler(self, fd, event): """ @@ -2017,7 +2014,7 @@ class EbuildPhase(SubProcess): for f in files.values(): f.close() self.registered = False - self.unregister(fd) + return self.registered def _set_returncode(self, wait_retval): SubProcess._set_returncode(self, wait_retval) @@ -2195,9 +2192,8 @@ class Binpkg(SlotObject): for line in wrap(waiting_msg, 65)) writemsg(waiting_msg, noiselevel=-1) - while retval is None: - scheduler.schedule() - retval = prefetcher.poll() + scheduler.schedule(prefetcher.reg_id) + retval = prefetcher.wait() del prefetcher fetcher = BinpkgFetcher(pkg=pkg, pretend=opts.pretend, @@ -2306,27 +2302,22 @@ class Binpkg(SlotObject): phase = "setup" ebuild_phase = EbuildPhase(fd_pipes=fd_pipes, - pkg=pkg, phase=phase, register=scheduler.register, - settings=settings, tree=tree, unregister=scheduler.unregister) + pkg=pkg, phase=phase, scheduler=scheduler, + settings=settings, tree=tree) ebuild_phase.start() - retval = None - while retval is None: - scheduler.schedule() - retval = ebuild_phase.poll() + scheduler.schedule(ebuild_phase.reg_id) + retval = ebuild_phase.wait() if retval != os.EX_OK: return retval extractor = BinpkgExtractorAsync(image_dir=image_dir, - pkg=pkg, pkg_path=pkg_path, register=scheduler.register, - unregister=scheduler.unregister) + pkg=pkg, pkg_path=pkg_path, scheduler=scheduler) portage.writemsg_stdout(">>> Extracting %s\n" % pkg.cpv) extractor.start() - retval = None - while retval is None: - scheduler.schedule() - retval = extractor.poll() + scheduler.schedule(extractor.reg_id) + retval = extractor.wait() if retval != os.EX_OK: writemsg("!!! Error Extracting '%s'\n" % pkg_path, @@ -7061,7 +7052,7 @@ class Scheduler(object): _fetch_log = "/var/log/emerge-fetch.log" class _iface_class(SlotObject): - __slots__ = ("register", "schedule", "unregister") + __slots__ = ("register", "schedule") class _build_opts_class(SlotObject): __slots__ = ("buildpkg", "buildpkgonly", @@ -7111,9 +7102,11 @@ class Scheduler(object): self._logger = self._emerge_log_class( xterm_titles=("notitles" not in settings.features)) self._sched_iface = self._iface_class( - register=self._register, schedule=self._schedule, - unregister=self._unregister) + register=self._register, schedule=self._schedule) self._poll_event_handlers = {} + self._poll_event_handler_ids = {} + # Increment id for each new handler. + self._event_handler_id = 0 try: self._poll = select.poll() @@ -7264,14 +7257,14 @@ class Scheduler(object): elif pkg.type_name == "ebuild": prefetcher = EbuildFetcherAsync(logfile=self._fetch_log, pkg=pkg, - register=self._register, unregister=self._unregister) + scheduler=self._sched_iface) elif pkg.type_name == "binary" and \ "--getbinpkg" in self.myopts and \ pkg.root_config.trees["bintree"].isremote(pkg.cpv): prefetcher = BinpkgFetcherAsync(logfile=self._fetch_log, - pkg=pkg, register=self._register, unregister=self._unregister) + pkg=pkg, scheduler=self._sched_iface) return prefetcher @@ -7476,30 +7469,43 @@ class Scheduler(object): return (mylist, dropped_tasks) def _register(self, f, eventmask, handler): - self._poll_event_handlers[f] = handler + """ + @rtype: Integer + @return: A unique registration id, for use in schedule() or + unregister() calls. + """ + self._event_handler_id += 1 + reg_id = self._event_handler_id + self._poll_event_handler_ids[reg_id] = f + self._poll_event_handlers[f] = (handler, reg_id) self._poll.register(f, eventmask) + return reg_id - def _unregister(self, f): + def _unregister(self, reg_id): + f = self._poll_event_handler_ids[reg_id] self._poll.unregister(f) del self._poll_event_handlers[f] + del self._poll_event_handler_ids[reg_id] self._schedule_tasks() - def _schedule(self): + def _schedule(self, wait_id): + """ + Schedule until wait_id is not longer registered + for poll() events. + @type wait_id: int + @param wait_id: a task id to wait for + """ event_handlers = self._poll_event_handlers - running_tasks = self._prefetch_queue.running_tasks + handler_ids = self._poll_event_handler_ids poll = self._poll.poll self._schedule_tasks() - while event_handlers: + while wait_id in handler_ids: for f, event in poll(): - event_handlers[f](f, event) - - if len(event_handlers) <= len(running_tasks): - # Assuming one handler per task, this - # means the caller has unregistered it's - # handler, so it's time to yield. - break + handler, reg_id = event_handlers[f] + if not handler(f, event): + self._unregister(reg_id) def _world_atom(self, pkg): """ @@ -7565,8 +7571,6 @@ class Scheduler(object): self._logger.log(" >>> emerge (%s of %s) %s to %s" % \ (pkg_count.curval, pkg_count.maxval, pkg.cpv, pkg.root)) - self._schedule() - if pkg.type_name == "ebuild": build = EbuildBuild(args_set=self._args_set, find_blockers=self._find_blockers(pkg), -- cgit v1.2.3-1-g7c22