3 # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING
10 from urllib import urlencode
11 from requests_kerberos import HTTPKerberosAuth, OPTIONAL
14 class WrongPage(Exception):
18 class PageTree(object):
20 def __init__(self, result):
22 self.text = result.text
27 if self._tree is None:
28 self._tree = html.fromstring(self.text)
31 def first_value(self, rule):
32 result = self.tree.xpath(rule)
33 if type(result) is list:
40 def all_values(self, rule):
41 result = self.tree.xpath(rule)
42 if type(result) is list:
46 def make_referer(self):
47 return self.result.url
49 def expected_value(self, rule, expected):
50 value = self.first_value(rule)
52 raise ValueError("Expected [%s], got [%s]" % (expected, value))
55 class HttpSessions(object):
60 def add_server(self, name, baseuri, user=None, pwd=None):
61 new = {'baseuri': baseuri,
62 'session': requests.Session()}
67 self.servers[name] = new
69 def get_session(self, url):
70 for srv in self.servers:
72 if url.startswith(d['baseuri']):
75 raise ValueError("Unknown URL: %s" % url)
77 def get(self, url, krb=False, **kwargs):
78 session = self.get_session(url)
79 allow_redirects = False
81 # python-requests-kerberos isn't too bright about doing mutual
82 # authentication and it tries to do it on any non-401 response
83 # which doesn't work in our case since we follow redirects.
84 kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
85 kwargs['auth'] = kerberos_auth
86 allow_redirects = True
87 return session.get(url, allow_redirects=allow_redirects, **kwargs)
89 def post(self, url, **kwargs):
90 session = self.get_session(url)
91 return session.post(url, allow_redirects=False, **kwargs)
93 def access(self, action, url, krb=False, **kwargs):
94 action = string.lower(action)
96 return self.get(url, krb, **kwargs)
97 elif action == 'post':
98 return self.post(url, **kwargs)
100 raise ValueError("Unknown action type: [%s]" % action)
102 def new_url(self, referer, action):
103 if action.startswith('/'):
104 u = urlparse.urlparse(referer)
105 return '%s://%s%s' % (u.scheme, u.netloc, action)
108 def get_form_data(self, page, form_id, input_fields):
109 form_selector = '//form'
111 form_selector += '[@id="%s"]' % form_id
113 action = page.first_value('%s/@action' % form_selector)
114 values.append(action)
115 method = page.first_value('%s/@method' % form_selector)
116 values.append(method)
117 for field in input_fields:
118 value = page.all_values('%s/input/@%s' % (form_selector,
123 def handle_login_form(self, idp, page):
124 if type(page) != PageTree:
125 raise TypeError("Expected PageTree object")
127 srv = self.servers[idp]
130 results = self.get_form_data(page, "login_form", ["name", "value"])
131 action_url = results[0]
135 if action_url is None:
137 except Exception: # pylint: disable=broad-except
138 raise WrongPage("Not a Login Form Page")
140 referer = page.make_referer()
141 headers = {'referer': referer}
143 for i in range(0, len(names)):
144 payload[names[i]] = values[i]
146 # replace known values
147 payload['login_name'] = srv['user']
148 payload['login_password'] = srv['pwd']
150 return [method, self.new_url(referer, action_url),
151 {'headers': headers, 'data': payload}]
153 def handle_return_form(self, page):
154 if type(page) != PageTree:
155 raise TypeError("Expected PageTree object")
158 results = self.get_form_data(page, "saml-response",
160 action_url = results[0]
161 if action_url is None:
166 except Exception: # pylint: disable=broad-except
167 raise WrongPage("Not a Return Form Page")
169 referer = page.make_referer()
170 headers = {'referer': referer}
173 for i in range(0, len(names)):
174 payload[names[i]] = values[i]
176 return [method, self.new_url(referer, action_url),
177 {'headers': headers, 'data': payload}]
179 def handle_openid_form(self, page):
180 if type(page) != PageTree:
181 raise TypeError("Expected PageTree object")
183 if not page.first_value('//title/text()') == \
184 'OpenID transaction in progress':
185 raise WrongPage('Not OpenID autosubmit form')
188 results = self.get_form_data(page, None,
190 action_url = results[0]
191 if action_url is None:
196 except Exception: # pylint: disable=broad-except
197 raise WrongPage("Not OpenID autosubmit form")
199 referer = page.make_referer()
200 headers = {'referer': referer}
203 for i in range(0, len(names)):
204 payload[names[i]] = values[i]
206 return [method, self.new_url(referer, action_url),
207 {'headers': headers, 'data': payload}]
209 def handle_openid_consent_form(self, page):
210 if type(page) != PageTree:
211 raise TypeError("Expected PageTree object")
214 results = self.get_form_data(page, "consent_form",
216 action_url = results[0]
217 if action_url is None:
222 except Exception: # pylint: disable=broad-except
223 raise WrongPage("Not an OpenID Consent Form Page")
225 referer = page.make_referer()
226 headers = {'referer': referer}
229 for i in range(0, len(names)):
230 payload[names[i]] = values[i]
232 # Replace known values
233 payload['decided_allow'] = 'Allow'
235 return [method, self.new_url(referer, action_url),
236 {'headers': headers, 'data': payload}]
238 def fetch_page(self, idp, target_url, follow_redirect=True, krb=False):
240 Fetch a page and parse the response code to determine what to do
243 The login process consists of redirections (302/303) and
244 potentially an unauthorized (401). For the case of unauthorized
245 try the page returned in case of fallback authentication.
252 r = self.access(action, url, krb=krb, **args)
253 if r.status_code == 303 or r.status_code == 302:
254 if not follow_redirect:
256 url = r.headers['location']
259 elif r.status_code == 401:
261 if r.headers.get('WWW-Authenticate', None) is None:
264 # Fall back, hopefully to testauth authentication.
266 (action, url, args) = self.handle_login_form(idp, page)
270 elif r.status_code == 200:
274 (action, url, args) = self.handle_login_form(idp, page)
280 (action, url, args) = self.handle_return_form(page)
286 (action, url, args) = self.handle_openid_consent_form(page)
292 (action, url, args) = self.handle_openid_form(page)
297 # Either we got what we wanted, or we have to stop anyway
300 raise ValueError("Unhandled status (%d) on url %s" % (
303 def auth_to_idp(self, idp, krb=False):
305 srv = self.servers[idp]
306 target_url = '%s/%s/' % (srv['baseuri'], idp)
308 r = self.access('get', target_url, krb=krb)
309 if r.status_code != 200:
310 raise ValueError("Access to idp failed: %s" % repr(r))
313 page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')
314 href = page.first_value('//div[@id="content"]/p/a/@href')
315 url = self.new_url(target_url, href)
317 page = self.fetch_page(idp, url, krb=krb)
319 page.expected_value('//div[@id="welcome"]/p/text()',
320 'Welcome %s!' % srv['user'])
322 def logout_from_idp(self, idp):
324 srv = self.servers[idp]
325 target_url = '%s/%s/logout' % (srv['baseuri'], idp)
327 r = self.access('get', target_url)
328 if r.status_code != 200:
329 raise ValueError("Logout from idp failed: %s" % repr(r))
331 def get_sp_metadata(self, idp, sp):
332 idpsrv = self.servers[idp]
333 idpuri = idpsrv['baseuri']
335 spuri = self.servers[sp]['baseuri']
337 return (idpuri, requests.get('%s/saml2/metadata' % spuri))
339 def add_sp_metadata(self, idp, sp, rest=False):
340 expected_status = 200
341 (idpuri, m) = self.get_sp_metadata(idp, sp)
342 url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
343 headers = {'referer': url}
345 expected_status = 201
346 payload = {'metadata': m.content}
347 headers['content-type'] = 'application/x-www-form-urlencoded'
348 url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp)
349 r = self.post(url, headers=headers, data=urlencode(payload))
351 metafile = {'metafile': m.content}
352 payload = {'name': sp}
353 r = self.post(url, headers=headers, data=payload, files=metafile)
354 if r.status_code != expected_status:
355 raise ValueError('Failed to post SP data [%s]' % repr(r))
359 page.expected_value('//div[@class="alert alert-success"]/p/text()',
360 'SP Successfully added')
362 def set_sp_default_nameids(self, idp, sp, nameids):
364 nameids is a list of Name ID formats to enable
366 idpsrv = self.servers[idp]
367 idpuri = idpsrv['baseuri']
368 url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (idpuri, idp, sp)
369 headers = {'referer': url}
370 headers['content-type'] = 'application/x-www-form-urlencoded'
371 payload = {'submit': 'Submit',
372 'allowed_nameids': ', '.join(nameids)}
373 r = idpsrv['session'].post(url, headers=headers,
375 if r.status_code != 200:
376 raise ValueError('Failed to post SP data [%s]' % repr(r))
378 # pylint: disable=dangerous-default-value
379 def set_attributes_and_mapping(self, idp, mapping=[], attrs=[],
382 Set allowed attributes and mapping in the IDP or the SP. In the
383 case of the SP both allowed attributes and the mapping need to
384 be provided. An empty option for either means delete all values.
386 mapping is a list of list of rules of the form:
387 [['from-1', 'to-1'], ['from-2', 'from-2']]
389 ex. [['*', '*'], ['fullname', 'namefull']]
391 attrs is the list of attributes that will be allowed:
392 ['fullname', 'givenname', 'surname']
394 idpsrv = self.servers[idp]
395 idpuri = idpsrv['baseuri']
396 if spname: # per-SP setting
397 url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (
399 mapname = 'Attribute Mapping'
400 attrname = 'Allowed Attributes'
401 else: # global default
402 url = '%s/%s/admin/providers/saml2' % (idpuri, idp)
403 mapname = 'default attribute mapping'
404 attrname = 'default allowed attributes'
406 headers = {'referer': url}
407 headers['content-type'] = 'application/x-www-form-urlencoded'
408 payload = {'submit': 'Submit'}
411 payload['%s %s-from' % (mapname, count)] = m[0]
412 payload['%s %s-to' % (mapname, count)] = m[1]
416 payload['%s %s-name' % (attrname, count)] = attr
418 r = idpsrv['session'].post(url, headers=headers,
420 if r.status_code != 200:
421 raise ValueError('Failed to post IDP data [%s]' % repr(r))
423 def fetch_rest_page(self, idpname, uri):
425 idpname - the name of the IDP to fetch the page from
426 uri - the URI of the page to retrieve
428 The URL for the request is built from known-information in
431 returns dict if successful
432 returns ValueError if the output is unparseable
434 baseurl = self.servers[idpname].get('baseuri')
435 page = self.fetch_page(
437 '%s%s' % (baseurl, uri)
439 return json.loads(page.text)
441 def get_rest_sp(self, idpname, spname=None):
443 uri = '/%s/rest/providers/saml2/SPS/' % idpname
445 uri = '/%s/rest/providers/saml2/SPS/%s' % (idpname, spname)
447 return self.fetch_rest_page(idpname, uri)