summaryrefslogtreecommitdiffstats
path: root/pym/portage/dbapi/virtual.py
blob: 30d6c227fe54c848a6bfca69a6c270c226148c0f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
# Copyright 1998-2012 Gentoo Foundation
# Distributed under the terms of the GNU General Public License v2


from portage.dbapi import dbapi
from portage.dbapi.dep_expand import dep_expand
from portage.versions import cpv_getkey, _pkg_str

class fakedbapi(dbapi):
	"""A fake dbapi that allows consumers to inject/remove packages to/from it
	portage.settings is required to maintain the dbAPI.
	"""
	def __init__(self, settings=None, exclusive_slots=True):
		"""
		@param exclusive_slots: When True, injecting a package with SLOT
			metadata causes an existing package in the same slot to be
			automatically removed (default is True).
		@type exclusive_slots: Boolean
		"""
		self._exclusive_slots = exclusive_slots
		self.cpvdict = {}
		self.cpdict = {}
		if settings is None:
			from portage import settings
		self.settings = settings
		self._match_cache = {}

	def _clear_cache(self):
		if self._categories is not None:
			self._categories = None
		if self._match_cache:
			self._match_cache = {}

	def match(self, origdep, use_cache=1):
		atom = dep_expand(origdep, mydb=self, settings=self.settings)
		cache_key = (atom, atom.unevaluated_atom)
		result = self._match_cache.get(cache_key)
		if result is not None:
			return result[:]
		result = list(self._iter_match(atom, self.cp_list(atom.cp)))
		self._match_cache[cache_key] = result
		return result[:]

	def cpv_exists(self, mycpv, myrepo=None):
		return mycpv in self.cpvdict

	def cp_list(self, mycp, use_cache=1, myrepo=None):
		# NOTE: Cache can be safely shared with the match cache, since the
		# match cache uses the result from dep_expand for the cache_key.
		cache_key = (mycp, mycp)
		cachelist = self._match_cache.get(cache_key)
		if cachelist is not None:
			return cachelist[:]
		cpv_list = self.cpdict.get(mycp)
		if cpv_list is None:
			cpv_list = []
		self._cpv_sort_ascending(cpv_list)
		self._match_cache[cache_key] = cpv_list
		return cpv_list[:]

	def cp_all(self):
		return list(self.cpdict)

	def cpv_all(self):
		return list(self.cpvdict)

	def cpv_inject(self, mycpv, metadata=None):
		"""Adds a cpv to the list of available packages. See the
		exclusive_slots constructor parameter for behavior with
		respect to SLOT metadata.
		@param mycpv: cpv for the package to inject
		@type mycpv: str
		@param metadata: dictionary of raw metadata for aux_get() calls
		@param metadata: dict
		"""
		self._clear_cache()

		try:
			mycp = mycpv.cp
		except AttributeError:
			mycp = None
		try:
			myslot = mycpv.slot
		except AttributeError:
			myslot = None

		if mycp is None or \
			(myslot is None and metadata is not None and metadata.get('SLOT')):
			if metadata is None:
				mycpv = _pkg_str(mycpv)
			else:
				mycpv = _pkg_str(mycpv, metadata=metadata,
					settings=self.settings)

			mycp = mycpv.cp
			try:
				myslot = mycpv.slot
			except AttributeError:
				pass

		self.cpvdict[mycpv] = metadata
		if not self._exclusive_slots:
			myslot = None
		if myslot and mycp in self.cpdict:
			# If necessary, remove another package in the same SLOT.
			for cpv in self.cpdict[mycp]:
				if mycpv != cpv:
					try:
						other_slot = cpv.slot
					except AttributeError:
						pass
					else:
						if myslot == other_slot:
							self.cpv_remove(cpv)
							break

		cp_list = self.cpdict.get(mycp)
		if cp_list is None:
			cp_list = []
			self.cpdict[mycp] = cp_list
		try:
			cp_list.remove(mycpv)
		except ValueError:
			pass
		cp_list.append(mycpv)

	def cpv_remove(self,mycpv):
		"""Removes a cpv from the list of available packages."""
		self._clear_cache()
		mycp = cpv_getkey(mycpv)
		if mycpv in self.cpvdict:
			del	self.cpvdict[mycpv]
		if mycp not in self.cpdict:
			return
		while mycpv in self.cpdict[mycp]:
			del self.cpdict[mycp][self.cpdict[mycp].index(mycpv)]
		if not len(self.cpdict[mycp]):
			del self.cpdict[mycp]

	def aux_get(self, mycpv, wants, myrepo=None):
		if not self.cpv_exists(mycpv):
			raise KeyError(mycpv)
		metadata = self.cpvdict[mycpv]
		if not metadata:
			return ["" for x in wants]
		return [metadata.get(x, "") for x in wants]

	def aux_update(self, cpv, values):
		self._clear_cache()
		self.cpvdict[cpv].update(values)

class testdbapi(object):
	"""A dbapi instance with completely fake functions to get by hitting disk
	TODO(antarus): 
	This class really needs to be rewritten to have better stubs; but these work for now.
	The dbapi classes themselves need unit tests...and that will be a lot of work.
	"""

	def __init__(self):
		self.cpvs = {}
		def f(*args, **kwargs):
			return True
		fake_api = dir(dbapi)
		for call in fake_api:
			if not hasattr(self, call):
				setattr(self, call, f)