summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorZac Medico <zmedico@gentoo.org>2010-10-21 18:13:33 -0700
committerZac Medico <zmedico@gentoo.org>2010-10-21 18:13:33 -0700
commit72298bcb48e432011d56143525adb642bd18f1d4 (patch)
treea3b2bcd8538db2ddec5c5ff8a3d623209ef7fc78
parentf2d7564b93b4c5387fe82d8a8561b6420c04dee7 (diff)
downloadportage-72298bcb48e432011d56143525adb642bd18f1d4.tar.gz
portage-72298bcb48e432011d56143525adb642bd18f1d4.tar.bz2
portage-72298bcb48e432011d56143525adb642bd18f1d4.zip
AsynchronousLock: use subprocess if no threads
-rwxr-xr-xbin/lock-helper.py23
-rw-r--r--pym/_emerge/AsynchronousLock.py155
-rw-r--r--pym/_emerge/BinpkgFetcher.py4
-rw-r--r--pym/_emerge/EbuildBuildDir.py4
-rw-r--r--pym/portage/__init__.py1
-rw-r--r--pym/portage/dbapi/vartree.py10
-rw-r--r--pym/portage/tests/locks/test_asynchronous_lock.py16
7 files changed, 186 insertions, 27 deletions
diff --git a/bin/lock-helper.py b/bin/lock-helper.py
new file mode 100755
index 000000000..5c332b2af
--- /dev/null
+++ b/bin/lock-helper.py
@@ -0,0 +1,23 @@
+#!/usr/bin/python
+# Copyright 2010 Gentoo Foundation
+# Distributed under the terms of the GNU General Public License v2
+
+import sys
+import portage
+
+def main(args):
+
+ if args and sys.hexversion < 0x3000000 and not isinstance(args[0], unicode):
+ for i, x in enumerate(args):
+ args[i] = portage._unicode_decode(x, errors='strict')
+
+ lock_obj = portage.locks.lockfile(args[0], wantnewlockfile=True)
+ sys.stdout.write('\0')
+ sys.stdout.flush()
+ sys.stdin.read(1)
+ portage.locks.unlockfile(lock_obj)
+ return portage.os.EX_OK
+
+if __name__ == "__main__":
+ rval = main(sys.argv[1:])
+ sys.exit(rval)
diff --git a/pym/_emerge/AsynchronousLock.py b/pym/_emerge/AsynchronousLock.py
index 6eb90b4e4..bce81ed5c 100644
--- a/pym/_emerge/AsynchronousLock.py
+++ b/pym/_emerge/AsynchronousLock.py
@@ -3,18 +3,76 @@
import dummy_threading
import fcntl
+import sys
+
try:
import threading
except ImportError:
import dummy_threading as threading
+import portage
from portage import os
from portage.exception import TryAgain
from portage.locks import lockfile, unlockfile
from _emerge.AbstractPollTask import AbstractPollTask
+from _emerge.AsynchronousTask import AsynchronousTask
from _emerge.PollConstants import PollConstants
+from _emerge.SpawnProcess import SpawnProcess
+
+class AsynchronousLock(AsynchronousTask):
+ """
+ This uses the portage.locks module to acquire a lock asynchronously,
+ using either a thread (if available) or a subprocess.
+ """
+
+ __slots__ = ('path', 'scheduler',) + \
+ ('_imp', '_force_async', '_force_dummy', '_force_process', \
+ '_force_thread')
+
+ def _start(self):
+
+ if not self._force_async:
+ try:
+ self._imp = lockfile(self.path,
+ wantnewlockfile=True, flags=os.O_NONBLOCK)
+ except TryAgain:
+ pass
+ else:
+ self.returncode = os.EX_OK
+ self.wait()
+ return
+
+ if self._force_process or \
+ (not self._force_thread and threading is dummy_threading):
+ self._imp = _LockProcess(path=self.path, scheduler=self.scheduler)
+ else:
+ self._imp = _LockThread(path=self.path,
+ scheduler=self.scheduler,
+ _force_dummy=self._force_dummy)
+
+ self._imp.addExitListener(self._imp_exit)
+ self._imp.start()
+
+ def _imp_exit(self, imp):
+ # call exit listeners
+ self.wait()
+
+ def _wait(self):
+ if self.returncode is not None:
+ return self.returncode
+ self.returncode = self._imp._wait()
+ return self.returncode
+
+ def unlock(self):
+ if self._imp is None:
+ raise AssertionError('not locked')
+ if isinstance(self._imp, (_LockProcess, _LockThread)):
+ self._imp.unlock()
+ else:
+ unlockfile(self._imp)
+ self._imp = None
-class AsynchronousLock(AbstractPollTask):
+class _LockThread(AbstractPollTask):
"""
This uses the portage.locks module to acquire a lock asynchronously,
using a background thread. After the lock is acquired, the thread
@@ -27,25 +85,10 @@ class AsynchronousLock(AbstractPollTask):
"""
__slots__ = ('lock_obj', 'path',) + \
- ('_files', '_force_thread', '_force_dummy',
+ ('_files', '_force_dummy',
'_thread', '_reg_id',)
def _start(self):
-
- if self._force_thread:
- self._start_thread()
- return
-
- try:
- self.lock_obj = lockfile(self.path,
- wantnewlockfile=True, flags=os.O_NONBLOCK)
- except TryAgain:
- self._start_thread()
- else:
- self.returncode = os.EX_OK
- self.wait()
-
- def _start_thread(self):
pr, pw = os.pipe()
self._files = {}
self._files['pipe_read'] = os.fdopen(pr, 'rb', 0)
@@ -101,3 +144,81 @@ class AsynchronousLock(AbstractPollTask):
for f in self._files.values():
f.close()
self._files = None
+
+class _LockProcess(AbstractPollTask):
+ """
+ This uses the portage.locks module to acquire a lock asynchronously,
+ using a subprocess. After the lock is acquired, the process
+ writes to a pipe in order to notify a poll loop running in the main
+ process. The unlock() method notifies the subprocess to release the
+ lock and exit.
+ """
+
+ __slots__ = ('path', 'scheduler',) + \
+ ('_proc', '_files', '_reg_id')
+
+ def _start(self):
+ in_pr, in_pw = os.pipe()
+ out_pr, out_pw = os.pipe()
+ self._files = {}
+ self._files['pipe_in'] = os.fdopen(in_pr, 'rb', 0)
+ self._files['pipe_out'] = os.fdopen(out_pw, 'wb', 0)
+ fcntl.fcntl(in_pr, fcntl.F_SETFL,
+ fcntl.fcntl(in_pr, fcntl.F_GETFL) | os.O_NONBLOCK)
+ self._reg_id = self.scheduler.register(in_pr,
+ PollConstants.POLLIN, self._output_handler)
+ self._registered = True
+ self._proc = SpawnProcess(
+ args=[portage._python_interpreter,
+ os.path.join(portage._bin_path, 'lock-helper.py'), self.path],
+ env=os.environ,
+ fd_pipes={0:out_pr, 1:in_pw, 2:sys.stderr.fileno()},
+ scheduler=self.scheduler)
+ self._proc.addExitListener(self._proc_exit)
+ self._proc.start()
+ os.close(out_pr)
+ os.close(in_pw)
+
+ def _proc_exit(self, proc):
+ if proc.returncode != os.EX_OK:
+ # There's no good reason for locks to fail.
+ raise AssertionError('lock process failed with returncode %s' \
+ % (proc.returncode,))
+
+ def _wait(self):
+ if self.returncode is not None:
+ return self.returncode
+ if self._registered:
+ self.scheduler.schedule(self._reg_id)
+ return self.returncode
+
+ def _output_handler(self, f, event):
+ buf = self._read_buf(self._files['pipe_in'], event)
+ if buf:
+ self._unregister()
+ self.returncode = os.EX_OK
+ self.wait()
+
+ def _unregister(self):
+ self._registered = False
+
+ if self._reg_id is not None:
+ self.scheduler.unregister(self._reg_id)
+ self._reg_id = None
+
+ if self._files is not None:
+ try:
+ pipe_in = self._files.pop('pipe_in')
+ except KeyError:
+ pass
+ else:
+ pipe_in.close()
+
+ def unlock(self):
+ if self._proc is None:
+ raise AssertionError('not locked')
+ self._files['pipe_out'].write(b'\0')
+ self._files['pipe_out'].close()
+ self._files = None
+ self._proc.wait()
+ self._proc = None
diff --git a/pym/_emerge/BinpkgFetcher.py b/pym/_emerge/BinpkgFetcher.py
index 6fbce97dc..9876cf444 100644
--- a/pym/_emerge/BinpkgFetcher.py
+++ b/pym/_emerge/BinpkgFetcher.py
@@ -144,7 +144,7 @@ class BinpkgFetcher(SpawnProcess):
scheduler=self.scheduler)
async_lock.start()
async_lock.wait()
- self._lock_obj = async_lock.lock_obj
+ self._lock_obj = async_lock
self.locked = True
class AlreadyLocked(portage.exception.PortageException):
@@ -153,7 +153,7 @@ class BinpkgFetcher(SpawnProcess):
def unlock(self):
if self._lock_obj is None:
return
- portage.locks.unlockfile(self._lock_obj)
+ self._lock_obj.unlock()
self._lock_obj = None
self.locked = False
diff --git a/pym/_emerge/EbuildBuildDir.py b/pym/_emerge/EbuildBuildDir.py
index de3f56dfb..bdb7fbc7d 100644
--- a/pym/_emerge/EbuildBuildDir.py
+++ b/pym/_emerge/EbuildBuildDir.py
@@ -46,7 +46,7 @@ class EbuildBuildDir(SlotObject):
scheduler=self.scheduler)
builddir_lock.start()
builddir_lock.wait()
- self._lock_obj = builddir_lock.lock_obj
+ self._lock_obj = builddir_lock
self.settings['PORTAGE_BUILDIR_LOCKED'] = '1'
finally:
self.locked = self._lock_obj is not None
@@ -70,7 +70,7 @@ class EbuildBuildDir(SlotObject):
if self._lock_obj is None:
return
- portage.locks.unlockdir(self._lock_obj)
+ self._lock_obj.unlock()
self._lock_obj = None
self.locked = False
self.settings.pop('PORTAGE_BUILDIR_LOCKED', None)
diff --git a/pym/portage/__init__.py b/pym/portage/__init__.py
index d85a32a20..2dedde49b 100644
--- a/pym/portage/__init__.py
+++ b/pym/portage/__init__.py
@@ -327,6 +327,7 @@ except (ImportError, OSError) as e:
# ===========================================================================
_python_interpreter = os.path.realpath(sys.executable)
+_bin_path = PORTAGE_BIN_PATH
def _ensure_default_encoding():
diff --git a/pym/portage/dbapi/vartree.py b/pym/portage/dbapi/vartree.py
index 78a1af3b8..18a3c0d7c 100644
--- a/pym/portage/dbapi/vartree.py
+++ b/pym/portage/dbapi/vartree.py
@@ -1300,11 +1300,14 @@ class dblink(object):
scheduler=self._scheduler)
async_lock.start()
async_lock.wait()
- self._lock_vdb = async_lock.lock_obj
+ self._lock_vdb = async_lock
def unlockdb(self):
- if self._lock_vdb:
- unlockdir(self._lock_vdb)
+ if self._lock_vdb is not None:
+ if isinstance(self._lock_vdb, AsynchronousLock):
+ self._lock_vdb.unlock()
+ else:
+ unlockdir(self._lock_vdb)
self._lock_vdb = None
def getpath(self):
@@ -3825,6 +3828,7 @@ class dblink(object):
settings.backup_changes(var_name)
shutil.copytree(var_orig, var_new, symlinks=True)
os.chmod(var_new, dir_perms)
+ portage._bin_path = settings['PORTAGE_BIN_PATH']
os.chmod(base_path_tmp, dir_perms)
# This serves so pre-load the modules.
_preload_elog_modules(self.settings)
diff --git a/pym/portage/tests/locks/test_asynchronous_lock.py b/pym/portage/tests/locks/test_asynchronous_lock.py
index ac38462ed..7e9fdfec9 100644
--- a/pym/portage/tests/locks/test_asynchronous_lock.py
+++ b/pym/portage/tests/locks/test_asynchronous_lock.py
@@ -16,14 +16,24 @@ class AsynchronousLockTestCase(TestCase):
tempdir = tempfile.mkdtemp()
try:
path = os.path.join(tempdir, 'lock_me')
- for force_thread in (True, False):
+ for force_async in (True, False):
for force_dummy in (True, False):
async_lock = AsynchronousLock(path=path,
- scheduler=scheduler, _force_dummy=force_dummy,
- _force_thread=force_thread)
+ scheduler=scheduler, _force_async=force_async,
+ _force_thread=True,
+ _force_dummy=force_dummy)
async_lock.start()
async_lock.wait()
async_lock.unlock()
self.assertEqual(async_lock.returncode, os.EX_OK)
+
+ async_lock = AsynchronousLock(path=path,
+ scheduler=scheduler, _force_async=force_async,
+ _force_process=True)
+ async_lock.start()
+ async_lock.wait()
+ async_lock.unlock()
+ self.assertEqual(async_lock.returncode, os.EX_OK)
+
finally:
shutil.rmtree(tempdir)