From 72298bcb48e432011d56143525adb642bd18f1d4 Mon Sep 17 00:00:00 2001 From: Zac Medico Date: Thu, 21 Oct 2010 18:13:33 -0700 Subject: AsynchronousLock: use subprocess if no threads --- bin/lock-helper.py | 23 ++++ pym/_emerge/AsynchronousLock.py | 155 +++++++++++++++++++--- pym/_emerge/BinpkgFetcher.py | 4 +- pym/_emerge/EbuildBuildDir.py | 4 +- pym/portage/__init__.py | 1 + pym/portage/dbapi/vartree.py | 10 +- pym/portage/tests/locks/test_asynchronous_lock.py | 16 ++- 7 files changed, 186 insertions(+), 27 deletions(-) create mode 100755 bin/lock-helper.py diff --git a/bin/lock-helper.py b/bin/lock-helper.py new file mode 100755 index 000000000..5c332b2af --- /dev/null +++ b/bin/lock-helper.py @@ -0,0 +1,23 @@ +#!/usr/bin/python +# Copyright 2010 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import sys +import portage + +def main(args): + + if args and sys.hexversion < 0x3000000 and not isinstance(args[0], unicode): + for i, x in enumerate(args): + args[i] = portage._unicode_decode(x, errors='strict') + + lock_obj = portage.locks.lockfile(args[0], wantnewlockfile=True) + sys.stdout.write('\0') + sys.stdout.flush() + sys.stdin.read(1) + portage.locks.unlockfile(lock_obj) + return portage.os.EX_OK + +if __name__ == "__main__": + rval = main(sys.argv[1:]) + sys.exit(rval) diff --git a/pym/_emerge/AsynchronousLock.py b/pym/_emerge/AsynchronousLock.py index 6eb90b4e4..bce81ed5c 100644 --- a/pym/_emerge/AsynchronousLock.py +++ b/pym/_emerge/AsynchronousLock.py @@ -3,18 +3,76 @@ import dummy_threading import fcntl +import sys + try: import threading except ImportError: import dummy_threading as threading +import portage from portage import os from portage.exception import TryAgain from portage.locks import lockfile, unlockfile from _emerge.AbstractPollTask import AbstractPollTask +from _emerge.AsynchronousTask import AsynchronousTask from _emerge.PollConstants import PollConstants +from _emerge.SpawnProcess import SpawnProcess + +class AsynchronousLock(AsynchronousTask): + """ + This uses the portage.locks module to acquire a lock asynchronously, + using either a thread (if available) or a subprocess. + """ + + __slots__ = ('path', 'scheduler',) + \ + ('_imp', '_force_async', '_force_dummy', '_force_process', \ + '_force_thread') + + def _start(self): + + if not self._force_async: + try: + self._imp = lockfile(self.path, + wantnewlockfile=True, flags=os.O_NONBLOCK) + except TryAgain: + pass + else: + self.returncode = os.EX_OK + self.wait() + return + + if self._force_process or \ + (not self._force_thread and threading is dummy_threading): + self._imp = _LockProcess(path=self.path, scheduler=self.scheduler) + else: + self._imp = _LockThread(path=self.path, + scheduler=self.scheduler, + _force_dummy=self._force_dummy) + + self._imp.addExitListener(self._imp_exit) + self._imp.start() + + def _imp_exit(self, imp): + # call exit listeners + self.wait() + + def _wait(self): + if self.returncode is not None: + return self.returncode + self.returncode = self._imp._wait() + return self.returncode + + def unlock(self): + if self._imp is None: + raise AssertionError('not locked') + if isinstance(self._imp, (_LockProcess, _LockThread)): + self._imp.unlock() + else: + unlockfile(self._imp) + self._imp = None -class AsynchronousLock(AbstractPollTask): +class _LockThread(AbstractPollTask): """ This uses the portage.locks module to acquire a lock asynchronously, using a background thread. After the lock is acquired, the thread @@ -27,25 +85,10 @@ class AsynchronousLock(AbstractPollTask): """ __slots__ = ('lock_obj', 'path',) + \ - ('_files', '_force_thread', '_force_dummy', + ('_files', '_force_dummy', '_thread', '_reg_id',) def _start(self): - - if self._force_thread: - self._start_thread() - return - - try: - self.lock_obj = lockfile(self.path, - wantnewlockfile=True, flags=os.O_NONBLOCK) - except TryAgain: - self._start_thread() - else: - self.returncode = os.EX_OK - self.wait() - - def _start_thread(self): pr, pw = os.pipe() self._files = {} self._files['pipe_read'] = os.fdopen(pr, 'rb', 0) @@ -101,3 +144,81 @@ class AsynchronousLock(AbstractPollTask): for f in self._files.values(): f.close() self._files = None + +class _LockProcess(AbstractPollTask): + """ + This uses the portage.locks module to acquire a lock asynchronously, + using a subprocess. After the lock is acquired, the process + writes to a pipe in order to notify a poll loop running in the main + process. The unlock() method notifies the subprocess to release the + lock and exit. + """ + + __slots__ = ('path', 'scheduler',) + \ + ('_proc', '_files', '_reg_id') + + def _start(self): + in_pr, in_pw = os.pipe() + out_pr, out_pw = os.pipe() + self._files = {} + self._files['pipe_in'] = os.fdopen(in_pr, 'rb', 0) + self._files['pipe_out'] = os.fdopen(out_pw, 'wb', 0) + fcntl.fcntl(in_pr, fcntl.F_SETFL, + fcntl.fcntl(in_pr, fcntl.F_GETFL) | os.O_NONBLOCK) + self._reg_id = self.scheduler.register(in_pr, + PollConstants.POLLIN, self._output_handler) + self._registered = True + self._proc = SpawnProcess( + args=[portage._python_interpreter, + os.path.join(portage._bin_path, 'lock-helper.py'), self.path], + env=os.environ, + fd_pipes={0:out_pr, 1:in_pw, 2:sys.stderr.fileno()}, + scheduler=self.scheduler) + self._proc.addExitListener(self._proc_exit) + self._proc.start() + os.close(out_pr) + os.close(in_pw) + + def _proc_exit(self, proc): + if proc.returncode != os.EX_OK: + # There's no good reason for locks to fail. + raise AssertionError('lock process failed with returncode %s' \ + % (proc.returncode,)) + + def _wait(self): + if self.returncode is not None: + return self.returncode + if self._registered: + self.scheduler.schedule(self._reg_id) + return self.returncode + + def _output_handler(self, f, event): + buf = self._read_buf(self._files['pipe_in'], event) + if buf: + self._unregister() + self.returncode = os.EX_OK + self.wait() + + def _unregister(self): + self._registered = False + + if self._reg_id is not None: + self.scheduler.unregister(self._reg_id) + self._reg_id = None + + if self._files is not None: + try: + pipe_in = self._files.pop('pipe_in') + except KeyError: + pass + else: + pipe_in.close() + + def unlock(self): + if self._proc is None: + raise AssertionError('not locked') + self._files['pipe_out'].write(b'\0') + self._files['pipe_out'].close() + self._files = None + self._proc.wait() + self._proc = None diff --git a/pym/_emerge/BinpkgFetcher.py b/pym/_emerge/BinpkgFetcher.py index 6fbce97dc..9876cf444 100644 --- a/pym/_emerge/BinpkgFetcher.py +++ b/pym/_emerge/BinpkgFetcher.py @@ -144,7 +144,7 @@ class BinpkgFetcher(SpawnProcess): scheduler=self.scheduler) async_lock.start() async_lock.wait() - self._lock_obj = async_lock.lock_obj + self._lock_obj = async_lock self.locked = True class AlreadyLocked(portage.exception.PortageException): @@ -153,7 +153,7 @@ class BinpkgFetcher(SpawnProcess): def unlock(self): if self._lock_obj is None: return - portage.locks.unlockfile(self._lock_obj) + self._lock_obj.unlock() self._lock_obj = None self.locked = False diff --git a/pym/_emerge/EbuildBuildDir.py b/pym/_emerge/EbuildBuildDir.py index de3f56dfb..bdb7fbc7d 100644 --- a/pym/_emerge/EbuildBuildDir.py +++ b/pym/_emerge/EbuildBuildDir.py @@ -46,7 +46,7 @@ class EbuildBuildDir(SlotObject): scheduler=self.scheduler) builddir_lock.start() builddir_lock.wait() - self._lock_obj = builddir_lock.lock_obj + self._lock_obj = builddir_lock self.settings['PORTAGE_BUILDIR_LOCKED'] = '1' finally: self.locked = self._lock_obj is not None @@ -70,7 +70,7 @@ class EbuildBuildDir(SlotObject): if self._lock_obj is None: return - portage.locks.unlockdir(self._lock_obj) + self._lock_obj.unlock() self._lock_obj = None self.locked = False self.settings.pop('PORTAGE_BUILDIR_LOCKED', None) diff --git a/pym/portage/__init__.py b/pym/portage/__init__.py index d85a32a20..2dedde49b 100644 --- a/pym/portage/__init__.py +++ b/pym/portage/__init__.py @@ -327,6 +327,7 @@ except (ImportError, OSError) as e: # =========================================================================== _python_interpreter = os.path.realpath(sys.executable) +_bin_path = PORTAGE_BIN_PATH def _ensure_default_encoding(): diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py index 78a1af3b8..18a3c0d7c 100644 --- a/pym/portage/dbapi/vartree.py +++ b/pym/portage/dbapi/vartree.py @@ -1300,11 +1300,14 @@ class dblink(object): scheduler=self._scheduler) async_lock.start() async_lock.wait() - self._lock_vdb = async_lock.lock_obj + self._lock_vdb = async_lock def unlockdb(self): - if self._lock_vdb: - unlockdir(self._lock_vdb) + if self._lock_vdb is not None: + if isinstance(self._lock_vdb, AsynchronousLock): + self._lock_vdb.unlock() + else: + unlockdir(self._lock_vdb) self._lock_vdb = None def getpath(self): @@ -3825,6 +3828,7 @@ class dblink(object): settings.backup_changes(var_name) shutil.copytree(var_orig, var_new, symlinks=True) os.chmod(var_new, dir_perms) + portage._bin_path = settings['PORTAGE_BIN_PATH'] os.chmod(base_path_tmp, dir_perms) # This serves so pre-load the modules. _preload_elog_modules(self.settings) diff --git a/pym/portage/tests/locks/test_asynchronous_lock.py b/pym/portage/tests/locks/test_asynchronous_lock.py index ac38462ed..7e9fdfec9 100644 --- a/pym/portage/tests/locks/test_asynchronous_lock.py +++ b/pym/portage/tests/locks/test_asynchronous_lock.py @@ -16,14 +16,24 @@ class AsynchronousLockTestCase(TestCase): tempdir = tempfile.mkdtemp() try: path = os.path.join(tempdir, 'lock_me') - for force_thread in (True, False): + for force_async in (True, False): for force_dummy in (True, False): async_lock = AsynchronousLock(path=path, - scheduler=scheduler, _force_dummy=force_dummy, - _force_thread=force_thread) + scheduler=scheduler, _force_async=force_async, + _force_thread=True, + _force_dummy=force_dummy) async_lock.start() async_lock.wait() async_lock.unlock() self.assertEqual(async_lock.returncode, os.EX_OK) + + async_lock = AsynchronousLock(path=path, + scheduler=scheduler, _force_async=force_async, + _force_process=True) + async_lock.start() + async_lock.wait() + async_lock.unlock() + self.assertEqual(async_lock.returncode, os.EX_OK) + finally: shutil.rmtree(tempdir) -- cgit v1.2.3-1-g7c22