1 # Copyright (C) 2015 Ipsilon project Contributors, for licensee see COPYING
3 from ipsilon.util.log import Log
10 def __init__(self, mappings=None, allowed=None):
11 """ A Policy engine to filter attributes.
12 Mappings is a list of lists where the first value ia a list itself
13 and the second value is an attribute name or a list if the values
14 should go in a sub dictionary.
15 Note that mappings is a list and not a dictionary as this allows
16 to map the same original attribute to different resulting attributes
17 if wanted, by simply repeating the 'key list' with different values
20 Example: [[['extras', 'shoes'], 'shoeNumber']]
22 A '*' can be used to allow any attribute.
24 The default mapping is [[['*'], '*']]
25 This copies all attributes without transformation.
27 Allowed is a list of allowed attributes.
28 Normally mapping should be called before filtering, this means
29 allowed attributes should name the mapped attributes.
30 Allowed attributes can be multi-element lists
32 Example: ['fullname', ['groups', 'domain users']]
34 Allowed is '*' by default.
39 if not isinstance(mappings, list):
40 raise ValueError("Mappings should be a list not '%s'" %
43 if not isinstance(el, list):
44 raise ValueError("Mappings must be lists, not '%s'" %
47 raise ValueError("Mappings must contain 2 elements, "
49 if isinstance(el[0], list) and len(el[0]) > 2:
50 raise ValueError("1st Mapping element can contain at "
51 "most 2 values, found %d" % len(el[0]))
52 if isinstance(el[1], list) and len(el[1]) > 2:
53 raise ValueError("2nd Mapping element can contain at "
54 "most 2 values, found %d" % len(el[1]))
55 self.mappings = mappings
57 # default mapping, return all userdata and groups
59 self.mappings = [['*', '*']]
63 if not isinstance(allowed, list):
64 raise ValueError("Allowed should be a list not '%s'" %
66 self.allowed = allowed
68 def map_attributes(self, attributes, ignore_case=False):
70 if not isinstance(attributes, dict):
71 raise ValueError("Attributes must be dictionary, not %s" %
74 not_mapped = copy.deepcopy(attributes)
77 # If ignore_case is True,
78 # then PD translates case insensitively prefixes
80 for k in attributes.keys():
82 # note duplicates that differ only by case
83 # will be lost here, beware!
88 for (key, value) in self.mappings:
89 if not isinstance(key, list):
98 if not isinstance(value, list):
109 prefix = prefix.lower()
114 attr = attributes[PD[prefix]]
116 # '*' in a prefix matches nothing
119 # If ignore_case is True,
120 # then ND translates case insensitively names
122 if isinstance(attr, list):
128 # note duplicates that differ only by case
129 # will be lost here, beware!
137 if name in ND and ND[name] in attr:
138 if isinstance(attr, list):
140 if mapprefix not in mapped:
141 mapped[mapprefix] = list()
142 mapped[mapprefix].append(mapname)
144 if PD[prefix] in not_mapped:
145 while ND[name] in not_mapped[PD[prefix]]:
146 not_mapped[PD[prefix]].remove(ND[name])
148 if mapname not in mapped:
149 mapped[mapname] = list()
150 mapped[mapname].append(attr[ND[name]])
152 if PD[prefix] in not_mapped:
153 del not_mapped[PD[prefix]]
155 mapin = copy.deepcopy(attr[ND[name]])
159 if mapprefix not in mapped:
160 mapped[mapprefix] = dict()
161 mapped[mapprefix].update({mapname: mapin})
163 mapped.update({mapname: mapin})
166 if PD[prefix] in not_mapped:
167 if ND[name] in not_mapped[PD[prefix]]:
168 del not_mapped[PD[prefix]][ND[name]]
169 elif ND[name] in not_mapped:
170 del not_mapped[ND[name]]
172 mapin = copy.deepcopy(attr)
173 # mapname is ignored if name == '*'
175 if mapprefix not in mapped:
176 mapped[mapprefix] = mapin
178 mapped[mapprefix].update(mapin)
182 if prefix and PD[prefix] in not_mapped:
183 del not_mapped[PD[prefix]]
189 return mapped, not_mapped
191 def filter_attributes(self, attributes, whitelist=True):
195 for name in self.allowed:
196 if isinstance(name, list):
199 if key in attributes:
200 attr = attributes[key]
203 elif isinstance(attr, dict):
204 if key not in filtered:
205 filtered[key] = dict()
207 filtered[key][value] = attr[value]
208 elif isinstance(attr, list):
209 if key not in filtered:
210 filtered[key] = list()
212 filtered[key].append(value)
216 if name in attributes:
217 filtered[name] = attributes[name]
219 filtered = attributes
224 # filtered contains the blacklisted
225 allowed = copy.deepcopy(attributes)
226 for lvl1 in filtered:
227 attr = filtered[lvl1]
228 if isinstance(attr, dict):
230 del allowed[lvl1][lvl2]
231 elif isinstance(attr, list):
233 allowed[lvl1].remove(lvl2)
236 if len(allowed[lvl1]) == 0:
242 if __name__ == '__main__':
247 t_attributes = {'onenameone': 'onevalueone',
248 'onenametwo': 'onevaluetwo',
249 'two': {'twonameone': 'twovalueone',
250 'twonametwo': 'twovaluetwo'},
251 'three': {'threenameone': 'threevalueone',
252 'threenametwo': 'threevaluetwo'},
253 'four': {'fournameone': 'fourvalueone',
254 'fournametwo': 'fourvaluetwo'},
255 'five': ['one', 'two', 'three'],
256 'six': ['one', 'two', 'three']}
258 # test defaults first
261 print 'Default attribute mapping'
262 m, n = p.map_attributes(t_attributes)
263 if m == t_attributes and n is None:
267 print 'FAIL: Expected %s\nObtained %s' % (t_attributes, m)
269 print 'Default attribute filtering'
270 f = p.filter_attributes(t_attributes)
271 if f == t_attributes:
275 print 'Expected %s\nObtained %s' % (t_attributes, f)
277 # test custom mappings and filters
278 t_mappings = [[['onenameone'], 'onemappedone'],
279 [['onenametwo'], 'onemappedtwo'],
281 [['three', 'threenameone'], 'threemappedone'],
282 [['three', 'threenameone'], 'threemappedbis'],
283 [['four', '*'], ['four', '*']],
284 [['five'], 'listfive'],
285 [['six', 'one'], ['six', 'mapone']]]
287 m_result = {'onemappedone': 'onevalueone',
288 'onemappedtwo': 'onevaluetwo',
289 'twonameone': 'twovalueone',
290 'twonametwo': 'twovaluetwo',
291 'threemappedone': 'threevalueone',
292 'threemappedbis': 'threevalueone',
293 'four': {'fournameone': 'fourvalueone',
294 'fournametwo': 'fourvaluetwo'},
295 'listfive': ['one', 'two', 'three'],
298 n_result = {'three': {'threenametwo': 'threevaluetwo'},
299 'six': ['two', 'three']}
301 t_allowed = ['twonameone',
302 ['four', 'fournametwo'],
303 ['listfive', 'three'],
306 f_result = {'twonameone': 'twovalueone',
307 'four': {'fournametwo': 'fourvaluetwo'},
308 'listfive': ['three'],
311 p = Policy(t_mappings, t_allowed)
313 print 'Custom attribute mapping'
314 m, n = p.map_attributes(t_attributes)
315 if m == m_result and n == n_result:
319 print 'Expected %s\nObtained %s' % (m_result, m)
321 print 'Custom attribute filtering'
322 f = p.filter_attributes(m)
327 print 'Expected %s\nObtained %s' % (f_result, f)
329 t2_allowed = ['onemappedone', 'twonametwo', 'threemappedone',
332 f2_result = {'onemappedtwo': 'onevaluetwo',
333 'twonameone': 'twovalueone',
334 'threemappedbis': 'threevalueone',
335 'four': {'fournameone': 'fourvalueone',
336 'fournametwo': 'fourvaluetwo'},
337 'listfive': ['one', 'three'],
340 p = Policy(t_mappings, t2_allowed)
342 print 'Custom attribute filtering 2'
343 m, _ = p.map_attributes(t_attributes)
344 f = p.filter_attributes(m, whitelist=False)
349 print 'Expected %s\nObtained %s' % (f2_result, f)
351 # Case Insensitive matching
352 tci_attributes = {'oneNameone': 'onevalueone',
353 'onenamEtwo': 'onevaluetwo',
354 'Two': {'twonameone': 'twovalueone',
355 'twonameTwo': 'twovaluetwo'},
356 'thrEE': {'threeNAMEone': 'threevalueone',
357 'thrEEnametwo': 'threevaluetwo'},
358 'foUr': {'fournameone': 'fourvalueone',
359 'fournametwo': 'fourvaluetwo'},
360 'FIVE': ['one', 'two', 'three'],
361 'six': ['ONE', 'two', 'three']}
363 tci_mappings = [[['onenameone'], 'onemappedone'],
364 [['onenametwo'], 'onemappedtwo'],
366 [['three', 'threenameone'], 'threemappedone'],
367 [['three', 'threenameone'], 'threemappedbis'],
368 [['four', '*'], ['Four', '*']],
369 [['five'], 'listfive'],
370 [['six', 'one'], ['six', 'mapone']]]
372 mci_result = {'onemappedone': 'onevalueone',
373 'onemappedtwo': 'onevaluetwo',
374 'twonameone': 'twovalueone',
375 'twonameTwo': 'twovaluetwo',
376 'threemappedone': 'threevalueone',
377 'threemappedbis': 'threevalueone',
378 'Four': {'fournameone': 'fourvalueone',
379 'fournametwo': 'fourvaluetwo'},
380 'listfive': ['one', 'two', 'three'],
383 nci_result = {'thrEE': {'thrEEnametwo': 'threevaluetwo'},
384 'six': ['two', 'three']}
386 p = Policy(tci_mappings)
387 print 'Case insensitive attribute mapping'
388 m, n = p.map_attributes(tci_attributes, ignore_case=True)
389 if m == mci_result and n == nci_result:
393 print 'FAIL: Expected %s // %s\nObtained %s // %s' % \
394 (mci_result, nci_result, m, n)