diff options
-rw-r--r-- | pym/_emerge/AsynchronousLock.py | 103 | ||||
-rw-r--r-- | pym/portage/tests/locks/__init__.py | 2 | ||||
-rw-r--r-- | pym/portage/tests/locks/__test__ | 0 | ||||
-rw-r--r-- | pym/portage/tests/locks/test_asynchronous_lock.py | 29 |
4 files changed, 134 insertions, 0 deletions
diff --git a/pym/_emerge/AsynchronousLock.py b/pym/_emerge/AsynchronousLock.py new file mode 100644 index 000000000..6eb90b4e4 --- /dev/null +++ b/pym/_emerge/AsynchronousLock.py @@ -0,0 +1,103 @@ +# Copyright 2010 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import dummy_threading +import fcntl +try: + import threading +except ImportError: + import dummy_threading as threading + +from portage import os +from portage.exception import TryAgain +from portage.locks import lockfile, unlockfile +from _emerge.AbstractPollTask import AbstractPollTask +from _emerge.PollConstants import PollConstants + +class AsynchronousLock(AbstractPollTask): + """ + This uses the portage.locks module to acquire a lock asynchronously, + using a background thread. After the lock is acquired, the thread + writes to a pipe in order to notify a poll loop running in the main + thread. + + If the threading module is unavailable then the dummy_threading + module will be used, and the lock will be acquired synchronously + (before the start() method returns). + """ + + __slots__ = ('lock_obj', 'path',) + \ + ('_files', '_force_thread', '_force_dummy', + '_thread', '_reg_id',) + + def _start(self): + + if self._force_thread: + self._start_thread() + return + + try: + self.lock_obj = lockfile(self.path, + wantnewlockfile=True, flags=os.O_NONBLOCK) + except TryAgain: + self._start_thread() + else: + self.returncode = os.EX_OK + self.wait() + + def _start_thread(self): + pr, pw = os.pipe() + self._files = {} + self._files['pipe_read'] = os.fdopen(pr, 'rb', 0) + self._files['pipe_write'] = os.fdopen(pw, 'wb', 0) + for k, f in self._files.items(): + fcntl.fcntl(f.fileno(), fcntl.F_SETFL, + fcntl.fcntl(f.fileno(), fcntl.F_GETFL) | os.O_NONBLOCK) + self._reg_id = self.scheduler.register(self._files['pipe_read'].fileno(), + PollConstants.POLLIN, self._output_handler) + self._registered = True + threading_mod = threading + if self._force_dummy: + threading_mod = dummy_threading + self._thread = threading_mod.Thread(target=self._run_lock) + self._thread.start() + + def _run_lock(self): + self.lock_obj = lockfile(self.path, wantnewlockfile=True) + self._files['pipe_write'].write(b'\0') + + def _output_handler(self, f, event): + buf = self._read_buf(self._files['pipe_read'], event) + if buf: + self._unregister() + self.returncode = os.EX_OK + self.wait() + + def _wait(self): + if self.returncode is not None: + return self.returncode + if self._registered: + self.scheduler.schedule(self._reg_id) + return self.returncode + + def unlock(self): + if self.lock_obj is None: + raise AssertionError('not locked') + unlockfile(self.lock_obj) + self.lock_obj = None + + def _unregister(self): + self._registered = False + + if self._thread is not None: + self._thread.join() + self._thread = None + + if self._reg_id is not None: + self.scheduler.unregister(self._reg_id) + self._reg_id = None + + if self._files is not None: + for f in self._files.values(): + f.close() + self._files = None diff --git a/pym/portage/tests/locks/__init__.py b/pym/portage/tests/locks/__init__.py new file mode 100644 index 000000000..21a391aee --- /dev/null +++ b/pym/portage/tests/locks/__init__.py @@ -0,0 +1,2 @@ +# Copyright 2010 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 diff --git a/pym/portage/tests/locks/__test__ b/pym/portage/tests/locks/__test__ new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/pym/portage/tests/locks/__test__ diff --git a/pym/portage/tests/locks/test_asynchronous_lock.py b/pym/portage/tests/locks/test_asynchronous_lock.py new file mode 100644 index 000000000..ac38462ed --- /dev/null +++ b/pym/portage/tests/locks/test_asynchronous_lock.py @@ -0,0 +1,29 @@ +# Copyright 2010 Gentoo Foundation +# Distributed under the terms of the GNU General Public License v2 + +import shutil +import tempfile + +from portage import os +from portage.tests import TestCase +from _emerge.AsynchronousLock import AsynchronousLock +from _emerge.PollScheduler import PollScheduler + +class AsynchronousLockTestCase(TestCase): + + def testAsynchronousLock(self): + scheduler = PollScheduler().sched_iface + tempdir = tempfile.mkdtemp() + try: + path = os.path.join(tempdir, 'lock_me') + for force_thread in (True, False): + for force_dummy in (True, False): + async_lock = AsynchronousLock(path=path, + scheduler=scheduler, _force_dummy=force_dummy, + _force_thread=force_thread) + async_lock.start() + async_lock.wait() + async_lock.unlock() + self.assertEqual(async_lock.returncode, os.EX_OK) + finally: + shutil.rmtree(tempdir) |