From 72298bcb48e432011d56143525adb642bd18f1d4 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Thu, 21 Oct 2010 18:13:33 -0700 Subject: AsynchronousLock: use subprocess if no threads --- pym/_emerge/AsynchronousLock.py | 155 +++++++++++++++++++++++++++++++++++----- 1 file changed, 138 insertions(+), 17 deletions(-) (limited to 'pym/_emerge/AsynchronousLock.py') diff --git a/pym/_emerge/AsynchronousLock.py b/pym/_emerge/AsynchronousLock.py index 6eb90b4e4..bce81ed5c 100644 --- a/pym/_emerge/AsynchronousLock.py +++ b/pym/_emerge/AsynchronousLock.py @@ -3,18 +3,76 @@ import dummy_threading import fcntl +import sys + try: import threading except ImportError: import dummy_threading as threading +import portage from portage import os from portage.exception import TryAgain from portage.locks import lockfile, unlockfile from _emerge.AbstractPollTask import AbstractPollTask +from _emerge.AsynchronousTask import AsynchronousTask from _emerge.PollConstants import PollConstants +from _emerge.SpawnProcess import SpawnProcess + +class AsynchronousLock(AsynchronousTask): + """ + This uses the portage.locks module to acquire a lock asynchronously, + using either a thread (if available) or a subprocess. + """ + + __slots__ = ('path', 'scheduler',) + \ + ('_imp', '_force_async', '_force_dummy', '_force_process', \ + '_force_thread') + + def _start(self): + + if not self._force_async: + try: + self._imp = lockfile(self.path, + wantnewlockfile=True, flags=os.O_NONBLOCK) + except TryAgain: + pass + else: + self.returncode = os.EX_OK + self.wait() + return + + if self._force_process or \ + (not self._force_thread and threading is dummy_threading): + self._imp = _LockProcess(path=self.path, scheduler=self.scheduler) + else: + self._imp = _LockThread(path=self.path, + scheduler=self.scheduler, + _force_dummy=self._force_dummy) + + self._imp.addExitListener(self._imp_exit) + self._imp.start() + + def _imp_exit(self, imp): + # call exit listeners + self.wait() + + def _wait(self): + if self.returncode is not None: + return self.returncode + self.returncode = self._imp._wait() + return self.returncode + + def unlock(self): + if self._imp is None: + raise AssertionError('not locked') + if isinstance(self._imp, (_LockProcess, _LockThread)): + self._imp.unlock() + else: + unlockfile(self._imp) + self._imp = None -class AsynchronousLock(AbstractPollTask): +class _LockThread(AbstractPollTask): """ This uses the portage.locks module to acquire a lock asynchronously, using a background thread. After the lock is acquired, the thread @@ -27,25 +85,10 @@ class AsynchronousLock(AbstractPollTask): """ __slots__ = ('lock_obj', 'path',) + \ - ('_files', '_force_thread', '_force_dummy', + ('_files', '_force_dummy', '_thread', '_reg_id',) def _start(self): - - if self._force_thread: - self._start_thread() - return - - try: - self.lock_obj = lockfile(self.path, - wantnewlockfile=True, flags=os.O_NONBLOCK) - except TryAgain: - self._start_thread() - else: - self.returncode = os.EX_OK - self.wait() - - def _start_thread(self): pr, pw = os.pipe() self._files = {} self._files['pipe_read'] = os.fdopen(pr, 'rb', 0) @@ -101,3 +144,81 @@ class AsynchronousLock(AbstractPollTask): for f in self._files.values(): f.close() self._files = None + +class _LockProcess(AbstractPollTask): + """ + This uses the portage.locks module to acquire a lock asynchronously, + using a subprocess. After the lock is acquired, the process + writes to a pipe in order to notify a poll loop running in the main + process. The unlock() method notifies the subprocess to release the + lock and exit. + """ + + __slots__ = ('path', 'scheduler',) + \ + ('_proc', '_files', '_reg_id') + + def _start(self): + in_pr, in_pw = os.pipe() + out_pr, out_pw = os.pipe() + self._files = {} + self._files['pipe_in'] = os.fdopen(in_pr, 'rb', 0) + self._files['pipe_out'] = os.fdopen(out_pw, 'wb', 0) + fcntl.fcntl(in_pr, fcntl.F_SETFL, + fcntl.fcntl(in_pr, fcntl.F_GETFL) | os.O_NONBLOCK) + self._reg_id = self.scheduler.register(in_pr, + PollConstants.POLLIN, self._output_handler) + self._registered = True + self._proc = SpawnProcess( + args=[portage._python_interpreter, + os.path.join(portage._bin_path, 'lock-helper.py'), self.path], + env=os.environ, + fd_pipes={0:out_pr, 1:in_pw, 2:sys.stderr.fileno()}, + scheduler=self.scheduler) + self._proc.addExitListener(self._proc_exit) + self._proc.start() + os.close(out_pr) + os.close(in_pw) + + def _proc_exit(self, proc): + if proc.returncode != os.EX_OK: + # There's no good reason for locks to fail. + raise AssertionError('lock process failed with returncode %s' \ + % (proc.returncode,)) + + def _wait(self): + if self.returncode is not None: + return self.returncode + if self._registered: + self.scheduler.schedule(self._reg_id) + return self.returncode + + def _output_handler(self, f, event): + buf = self._read_buf(self._files['pipe_in'], event) + if buf: + self._unregister() + self.returncode = os.EX_OK + self.wait() + + def _unregister(self): + self._registered = False + + if self._reg_id is not None: + self.scheduler.unregister(self._reg_id) + self._reg_id = None + + if self._files is not None: + try: + pipe_in = self._files.pop('pipe_in') + except KeyError: + pass + else: + pipe_in.close() + + def unlock(self): + if self._proc is None: + raise AssertionError('not locked') + self._files['pipe_out'].write(b'\0') + self._files['pipe_out'].close() + self._files = None + self._proc.wait() + self._proc = None -- cgit v1.2.3-1-g7c22