diff options
Diffstat (limited to 'layman/remotedb.py')
-rw-r--r-- | layman/remotedb.py | 410 |
1 files changed, 410 insertions, 0 deletions
diff --git a/layman/remotedb.py b/layman/remotedb.py new file mode 100644 index 0000000..851b89b --- /dev/null +++ b/layman/remotedb.py @@ -0,0 +1,410 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +################################################################################# +# LAYMAN OVERLAY DB +################################################################################# +# File: db.py +# +# Access to the db of overlays +# +# Copyright: +# (c) 2005 - 2008 Gunnar Wrobel +# Distributed under the terms of the GNU General Public License v2 +# +# Author(s): +# Gunnar Wrobel <wrobel@gentoo.org> +# +'''Handles different storage files.''' + +from __future__ import with_statement + +__version__ = "$Id: db.py 309 2007-04-09 16:23:38Z wrobel $" + +#=============================================================================== +# +# Dependencies +# +#------------------------------------------------------------------------------- + +import os, os.path +import sys +import urllib2 +import hashlib + +from pygpg.config import GPGConfig +from pygpg.gpg import GPG + +from layman.utils import encoder +from layman.dbbase import DbBase +from layman.version import VERSION +from layman.compatibility import fileopen + +class RemoteDB(DbBase): + '''Handles fetching the remote overlay list.''' + + def __init__(self, config, ignore_init_read_errors=False): + + self.config = config + self.output = config['output'] + + self.proxies = {} + + if config['proxy']: + self.proxies['http'] = config['proxy'] + elif os.getenv('http_proxy'): + self.proxies['http'] = os.getenv('http_proxy') + + if self.proxies: + proxy_handler = urllib2.ProxyHandler(self.proxies) + opener = urllib2.build_opener(proxy_handler) + urllib2.install_opener(opener) + + self.urls = [i.strip() + for i in config['overlays'].split('\n') if len(i)] + + #pair up the list url and detached sig url + d_urls = [i.strip() + for i in config['gpg_detached_lists'].split('\n') if len(i)] + self.detached_urls = [] + #for index in range(0, len(d_urls), 2): + # self.detached_urls.append((d_urls[index], d_urls[index+1])) + for i in d_urls: + u = i.split() + self.detached_urls.append((u[0], u[1])) + + self.signed_urls = [i.strip() + for i in config['gpg_signed_lists'].split('\n') if len(i)] + + self.output.debug('RemoteDB.__init__(), url lists= \nself.urls: %s\nself.detached_urls: %s\nself.signed_urls: %s' % (str(self.urls), str(self.detached_urls), str(self.signed_urls)), 2) + + # add up the lists to load for display, etc. + # unsigned overlay lists + paths = [self.filepath(i) + '.xml' for i in self.urls] + # detach-signed lists + paths.extend([self.filepath(i[0]) + '.xml' for i in self.detached_urls]) + # single file signed, compressed, clearsigned + paths.extend([self.filepath(i) + '.xml' for i in self.signed_urls]) + + self.output.debug('RemoteDB.__init__(), paths to load = %s' %str(paths), 2) + + if config['nocheck']: + ignore = 2 + else: + ignore = 0 + + #quiet = int(config['quietness']) < 3 + + DbBase.__init__(self, config, paths=paths, ignore=ignore, + ignore_init_read_errors=ignore_init_read_errors) + + self.gpg = None + self.gpg_config = None + + + # overrider + def _broken_catalog_hint(self): + return 'Try running "sudo layman -f" to re-fetch that file' + + + def cache(self): + ''' + Copy the remote overlay list to the local cache. + + >>> import tempfile + >>> here = os.path.dirname(os.path.realpath(__file__)) + >>> tmpdir = tempfile.mkdtemp(prefix="laymantmp_") + >>> cache = os.path.join(tmpdir, 'cache') + >>> myoptions = {'overlays' : + ... ['file://' + here + '/tests/testfiles/global-overlays.xml'], + ... 'cache' : cache, + ... 'nocheck' : 'yes', + ... 'proxy' : None} + >>> from layman.config import OptionConfig + >>> config = OptionConfig(myoptions) + >>> config.set_option('quietness', 3) + >>> a = RemoteDB(config) + >>> a.cache() + (True, True) + >>> b = fileopen(a.filepath(config['overlays'])+'.xml') + >>> b.readlines()[24] + ' A collection of ebuilds from Gunnar Wrobel [wrobel@gentoo.org].\\n' + + >>> b.close() + >>> os.unlink(a.filepath(config['overlays'])+'.xml') + + >>> a.overlays.keys() + [u'wrobel', u'wrobel-stable'] + + >>> import shutil + >>> shutil.rmtree(tmpdir) + ''' + has_updates = False + self._create_storage(self.config['storage']) + # succeeded reset when a failure is detected + succeeded = True + url_lists = [self.urls, self.detached_urls, self.signed_urls] + need_gpg = [False, True, True] + for index in range(0, 3): + self.output.debug("RemoteDB.cache() index = %s" %str(index),2) + urls = url_lists[index] + if need_gpg[index] and len(urls) and self.gpg is None: + #initialize our gpg instance + self.init_gpg() + # main working loop + for url in urls: + sig = '' + self.output.debug("RemoteDB.cache() url = %s is a tuple=%s" + %(str(url), str(isinstance(url, tuple))),2) + filepath, mpath, tpath, sig = self._paths(url) + + if sig: + success, olist, timestamp = self._fetch_url( + url[0], mpath, tpath) + else: + success, olist, timestamp = self._fetch_url( + url, mpath, tpath) + if not success: + #succeeded = False + continue + + self.output.debug("RemoteDB.cache() len(olist) = %s" %str(len(olist)),2) + # GPG handling + if need_gpg[index]: + olist, verified = self.verify_gpg(url, sig, olist) + if not verified: + self.output.debug("RemoteDB.cache() gpg returned " + "verified = %s" %str(verified),2) + succeeded = False + filename = os.path.join(self.config['storage'], "Failed-to-verify-sig") + self.write_cache(olist, filename) + continue + + # Before we overwrite the old cache, check that the downloaded + # file is intact and can be parsed + if isinstance(url, tuple): + olist = self._check_download(olist, url[0]) + else: + olist = self._check_download(olist, url) + + # Ok, now we can overwrite the old cache + has_updates = max(has_updates, + self.write_cache(olist, mpath, tpath, timestamp)) + + self.output.debug("RemoteDB.cache() self.urls: has_updates, succeeded" + " %s, %s" % (str(has_updates), str(succeeded)), 4) + return has_updates, succeeded + + + def _paths(self, url): + self.output.debug("RemoteDB._paths(), url is tuple %s" % str(url),2) + if isinstance(url, tuple): + filepath = self.filepath(url[0]) + sig = filepath + '.sig' + else: + filepath = self.filepath(url) + sig = '' + mpath = filepath + '.xml' + tpath = filepath + '.timestamp' + return filepath, mpath, tpath, sig + + + @staticmethod + def _create_storage(mpath): + # Create our storage directory if it is missing + if not os.path.exists(os.path.dirname(mpath)): + try: + os.makedirs(os.path.dirname(mpath)) + except OSError, error: + raise OSError('Failed to create layman storage direct' + + 'ory ' + os.path.dirname(mpath) + '\n' + + 'Error was:' + str(error)) + return + + + def filepath(self, url): + '''Return a unique file name for the url.''' + + base = self.config['cache'] + + self.output.debug('Generating cache path.', 6) + url_encoded = encoder(url, "UTF-8") + + return base + '_' + hashlib.md5(url_encoded).hexdigest() + + + def _fetch_url(self, url, mpath, tpath=None): + self.output.debug('RemoteDB._fetch_url() url = %s' % url, 2) + # check when the cache was last updated + # and don't re-fetch it unless it has changed + request = urllib2.Request(url) + opener = urllib2.build_opener() + opener.addheaders = [('Accept-Charset', 'utf-8'), + ('User-Agent', 'Layman-' + VERSION)] + #opener.addheaders[('User-Agent', 'Layman-' + VERSION)] + + if tpath and os.path.exists(tpath): + with fileopen(tpath,'r') as previous: + timestamp = previous.read() + request.add_header('If-Modified-Since', timestamp) + + if not self.check_path([mpath]): + return (False, '', '') + + try: + self.output.debug('RemoteDB._fetch_url() connecting to opener', 2) + connection = opener.open(request) + # py2, py3 compatibility, since only py2 returns keys as lower() + headers = dict((x.lower(), x) for x in connection.headers.keys()) + if 'last-modified' in headers: + timestamp = connection.headers[headers['last-modified']] + elif 'date' in headers: + timestamp = connection.headers[headers['date']] + else: + timestamp = None + except urllib2.HTTPError, e: + if e.code == 304: + self.output.info('Remote list already up to date: %s' + % url, 4) + self.output.info('Last-modified: %s' % timestamp, 4) + else: + self.output.error('RemoteDB.cache(); HTTPError was:\n' + 'url: %s\n%s' + % (url, str(e))) + return (False, '', '') + return (False, '', '') + except IOError, error: + self.output.error('RemoteDB.cache(); Failed to update the ' + 'overlay list from: %s\nIOError was:%s\n' + % (url, str(error))) + return (False, '', '') + else: + if url.startswith('file://'): + quieter = 1 + else: + quieter = 0 + self.output.info('Fetching new list... %s' % url, 4 + quieter) + if timestamp is not None: + self.output.info('Last-modified: %s' % timestamp, 4 + quieter) + # Fetch the remote list + olist = connection.read() + self.output.debug('RemoteDB._fetch_url(), olist type = %s' %str(type(olist)),2) + + return (True, olist, timestamp) + + + def check_path(self, paths, hint=True): + '''Check for sufficient privileges''' + self.output.debug('RemoteDB.check_path; paths = ' + str(paths), 8) + is_ok = True + for path in paths: + if os.path.exists(path) and not os.access(path, os.W_OK): + if hint: + self.output.warn( + 'You do not have permission to update the cache (%s).' + % path) + import getpass + if getpass.getuser() != 'root': + self.output.warn('Hint: You are not root.\n') + is_ok = False + return is_ok + + + def _check_download(self, olist, url): + + try: + self.read(olist, origin=url) + except Exception, error: + self.output.debug("RemoteDB._check_download(), url=%s \nolist:\n" % url,2) + self.output.debug(olist,2) + raise IOError('Failed to parse the overlays list fetched fr' + 'om ' + url + '\nThis means that the download' + 'ed file is somehow corrupt or there was a pr' + 'oblem with the webserver. Check the content ' + 'of the file. Error was:\n' + str(error)) + + # the folowing is neded for py3 only + if sys.hexversion >= 0x3000000 and hasattr(olist, 'decode'): + olist = olist.decode("UTF-8") + return olist + + + @staticmethod + def write_cache(olist, mpath, tpath=None, timestamp=None): + has_updates = False + try: + out_file = fileopen(mpath, 'w') + out_file.write(olist) + out_file.close() + + if timestamp is not None and tpath is not None: + out_file = fileopen(tpath, 'w') + out_file.write(str(timestamp)) + out_file.close() + + has_updates = True + + except Exception, error: + raise IOError('Failed to temporarily cache overlays list in' + ' ' + mpath + '\nError was:\n' + str(error)) + return has_updates + + def verify_gpg(self, url, sig, olist): + '''Verify and decode it.''' + self.output.debug("RemoteDB: verify_gpg(), verify & decrypt olist: " + " %s, type(olist)=%s" % (str(url),str(type(olist))), 2) + #self.output.debug(olist, 2) + + # detached sig + if sig: + self.output.debug("RemoteDB.verify_gpg(), detached sig", 2) + self.dl_sig(url[1], sig) + gpg_result = self.gpg.verify( + inputtxt=olist, + inputfile=sig) + # armoured signed file, compressed or clearsigned + else: + self.output.debug("RemoteDB.verify_gpg(), single signed file", 2) + gpg_result = self.gpg.decrypt( + inputtxt=olist) + olist = gpg_result.output + # verify and report + self.output.debug("gpg_result, verified=%s, len(olist)=%s" + % (gpg_result.verified[0], str(len(olist))),1) + if gpg_result.verified[0]: + self.output.info("GPG verification succeeded for gpg-signed url.", 4) + self.output.info('\tSignature result:' + str(gpg_result.verified), 4) + else: + self.output.error("GPG verification failed for gpg-signed url.") + self.output.error('\tSignature result:' + str(gpg_result.verified)) + olist = '' + return olist, gpg_result.verified[0] + + + def dl_sig(self, url, sig): + self.output.debug("RemoteDB.dl_sig() url=%s, sig=%s" % (url, sig),2) + success, newsig, timestamp = self._fetch_url(url, sig) + if success: + success = self.write_cache(newsig, sig) + return success + + + def init_gpg(self): + self.output.debug("RemoteDB.init_gpg(), initializing",2) + if not self.gpg_config: + self.gpg_config = GPGConfig() + + if not self.gpg: + self.gpg = GPG(self.gpg_config) + self.output.debug("RemoteDB.init_gpg(), initialized :D",2) + + +if __name__ == '__main__': + import doctest + + # Ignore warnings here. We are just testing + from warnings import filterwarnings, resetwarnings + filterwarnings('ignore') + + doctest.testmod(sys.modules[__name__]) + + resetwarnings() |