summaryrefslogtreecommitdiffstats
path: root/pym/portage/dbapi/virtual.py
blob: da15983eec21d447601c18c444cce6988ba6840e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
# Copyright 1998-2012 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2


from portage.dbapi import dbapi
from portage.dbapi.dep_expand import dep_expand
from portage.versions import cpv_getkey, _pkg_str

class fakedbapi(dbapi):
	"""A fake dbapi that allows consumers to inject/remove packages to/from it
	portage.settings is required to maintain the dbAPI.
	"""
	def __init__(self, settings=None, exclusive_slots=True):
		"""
		@param exclusive_slots: When True, injecting a package with SLOT
			metadata causes an existing package in the same slot to be
			automatically removed (default is True).
		@type exclusive_slots: Boolean
		"""
		self._exclusive_slots = exclusive_slots
		self.cpvdict = {}
		self.cpdict = {}
		if settings is None:
			from portage import settings
		self.settings = settings
		self._match_cache = {}

	def _clear_cache(self):
		if self._categories is not None:
			self._categories = None
		if self._match_cache:
			self._match_cache = {}

	def match(self, origdep, use_cache=1):
		atom = dep_expand(origdep, mydb=self, settings=self.settings)
		cache_key = (atom, atom.unevaluated_atom)
		result = self._match_cache.get(cache_key)
		if result is not None:
			return result[:]
		result = list(self._iter_match(atom, self.cp_list(atom.cp)))
		self._match_cache[cache_key] = result
		return result[:]

	def cpv_exists(self, mycpv, myrepo=None):
		return mycpv in self.cpvdict

	def cp_list(self, mycp, use_cache=1, myrepo=None):
		# NOTE: Cache can be safely shared with the match cache, since the
		# match cache uses the result from dep_expand for the cache_key.
		cache_key = (mycp, mycp)
		cachelist = self._match_cache.get(cache_key)
		if cachelist is not None:
			return cachelist[:]
		cpv_list = self.cpdict.get(mycp)
		if cpv_list is None:
			cpv_list = []
		self._cpv_sort_ascending(cpv_list)
		self._match_cache[cache_key] = cpv_list
		return cpv_list[:]

	def cp_all(self):
		return list(self.cpdict)

	def cpv_all(self):
		return list(self.cpvdict)

	def cpv_inject(self, mycpv, metadata=None):
		"""Adds a cpv to the list of available packages. See the
		exclusive_slots constructor parameter for behavior with
		respect to SLOT metadata.
		@param mycpv: cpv for the package to inject
		@type mycpv: str
		@param metadata: dictionary of raw metadata for aux_get() calls
		@param metadata: dict
		"""
		self._clear_cache()
		if not hasattr(mycpv, 'cp'):
			if metadata is None:
				mycpv = _pkg_str(mycpv)
			else:
				mycpv = _pkg_str(mycpv, slot=metadata.get('SLOT'),
					repo=metadata.get('repository'))
		mycp = mycpv.cp
		self.cpvdict[mycpv] = metadata
		myslot = None
		if self._exclusive_slots and metadata:
			myslot = metadata.get("SLOT", None)
		if myslot and mycp in self.cpdict:
			# If necessary, remove another package in the same SLOT.
			for cpv in self.cpdict[mycp]:
				if mycpv != cpv:
					other_metadata = self.cpvdict[cpv]
					if other_metadata:
						if myslot == other_metadata.get("SLOT", None):
							self.cpv_remove(cpv)
							break
		if mycp not in self.cpdict:
			self.cpdict[mycp] = []
		if not mycpv in self.cpdict[mycp]:
			self.cpdict[mycp].append(mycpv)

	def cpv_remove(self,mycpv):
		"""Removes a cpv from the list of available packages."""
		self._clear_cache()
		mycp = cpv_getkey(mycpv)
		if mycpv in self.cpvdict:
			del	self.cpvdict[mycpv]
		if mycp not in self.cpdict:
			return
		while mycpv in self.cpdict[mycp]:
			del self.cpdict[mycp][self.cpdict[mycp].index(mycpv)]
		if not len(self.cpdict[mycp]):
			del self.cpdict[mycp]

	def aux_get(self, mycpv, wants, myrepo=None):
		if not self.cpv_exists(mycpv):
			raise KeyError(mycpv)
		metadata = self.cpvdict[mycpv]
		if not metadata:
			return ["" for x in wants]
		return [metadata.get(x, "") for x in wants]

	def aux_update(self, cpv, values):
		self._clear_cache()
		self.cpvdict[cpv].update(values)

class testdbapi(object):
	"""A dbapi instance with completely fake functions to get by hitting disk
	TODO(antarus): 
	This class really needs to be rewritten to have better stubs; but these work for now.
	The dbapi classes themselves need unit tests...and that will be a lot of work.
	"""

	def __init__(self):
		self.cpvs = {}
		def f(*args, **kwargs):
			return True
		fake_api = dir(dbapi)
		for call in fake_api:
			if not hasattr(self, call):
				setattr(self, call, f)