3 # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING
10 from urllib import urlencode
11 from requests_kerberos import HTTPKerberosAuth, OPTIONAL
14 class WrongPage(Exception):
18 class PageTree(object):
20 def __init__(self, result):
22 self.text = result.text
27 if self._tree is None:
28 self._tree = html.fromstring(self.text)
31 def first_value(self, rule):
32 result = self.tree.xpath(rule)
33 if isinstance(result, list):
40 def all_values(self, rule):
41 result = self.tree.xpath(rule)
42 if isinstance(result, list):
46 def make_referer(self):
47 return self.result.url
49 def expected_value(self, rule, expected):
50 value = self.first_value(rule)
52 raise ValueError("Expected [%s], got [%s]" % (expected, value))
55 class HttpSessions(object):
60 def add_server(self, name, baseuri, user=None, pwd=None):
61 new = {'baseuri': baseuri,
62 'session': requests.Session()}
67 self.servers[name] = new
69 def get_session(self, url):
70 for srv in self.servers:
72 if url.startswith(d['baseuri']):
75 raise ValueError("Unknown URL: %s" % url)
77 def get(self, url, krb=False, **kwargs):
78 session = self.get_session(url)
79 allow_redirects = False
81 # python-requests-kerberos isn't too bright about doing mutual
82 # authentication and it tries to do it on any non-401 response
83 # which doesn't work in our case since we follow redirects.
84 kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
85 kwargs['auth'] = kerberos_auth
86 allow_redirects = True
87 return session.get(url, allow_redirects=allow_redirects, **kwargs)
89 def post(self, url, **kwargs):
90 session = self.get_session(url)
91 return session.post(url, allow_redirects=False, **kwargs)
93 def access(self, action, url, krb=False, **kwargs):
94 action = string.lower(action)
96 return self.get(url, krb, **kwargs)
97 elif action == 'post':
98 return self.post(url, **kwargs)
100 raise ValueError("Unknown action type: [%s]" % action)
102 def new_url(self, referer, action):
103 if action.startswith('/'):
104 u = urlparse.urlparse(referer)
105 return '%s://%s%s' % (u.scheme, u.netloc, action)
108 def get_form_data(self, page, form_id, input_fields):
109 form_selector = '//form'
111 form_selector += '[@id="%s"]' % form_id
113 action = page.first_value('%s/@action' % form_selector)
114 values.append(action)
115 method = page.first_value('%s/@method' % form_selector)
116 values.append(method)
117 for field in input_fields:
118 value = page.all_values('%s/input/@%s' % (form_selector,
123 def handle_login_form(self, idp, page):
124 if not isinstance(page, PageTree):
125 raise TypeError("Expected PageTree object")
127 srv = self.servers[idp]
130 results = self.get_form_data(page, "login_form", ["name", "value"])
131 action_url = results[0]
135 if action_url is None:
137 except Exception: # pylint: disable=broad-except
138 raise WrongPage("Not a Login Form Page")
140 referer = page.make_referer()
141 headers = {'referer': referer}
143 for i in range(0, len(names)):
144 payload[names[i]] = values[i]
146 # replace known values
147 payload['login_name'] = srv['user']
148 payload['login_password'] = srv['pwd']
150 return [method, self.new_url(referer, action_url),
151 {'headers': headers, 'data': payload}]
153 def handle_return_form(self, page):
154 if not isinstance(page, PageTree):
155 raise TypeError("Expected PageTree object")
158 results = self.get_form_data(page, "saml-response",
160 action_url = results[0]
161 if action_url is None:
166 except Exception: # pylint: disable=broad-except
167 raise WrongPage("Not a Return Form Page")
169 referer = page.make_referer()
170 headers = {'referer': referer}
173 for i in range(0, len(names)):
174 payload[names[i]] = values[i]
176 return [method, self.new_url(referer, action_url),
177 {'headers': headers, 'data': payload}]
179 def handle_openid_form(self, page):
180 if not isinstance(page, PageTree):
181 raise TypeError("Expected PageTree object")
183 if not page.first_value('//title/text()') == \
184 'OpenID transaction in progress':
185 raise WrongPage('Not OpenID autosubmit form')
188 results = self.get_form_data(page, None,
190 action_url = results[0]
191 if action_url is None:
196 except Exception: # pylint: disable=broad-except
197 raise WrongPage("Not OpenID autosubmit form")
199 referer = page.make_referer()
200 headers = {'referer': referer}
203 for i in range(0, len(names)):
204 payload[names[i]] = values[i]
206 return [method, self.new_url(referer, action_url),
207 {'headers': headers, 'data': payload}]
209 def handle_openid_consent_form(self, page):
210 if not isinstance(page, PageTree):
211 raise TypeError("Expected PageTree object")
214 results = self.get_form_data(page, "consent_form",
216 action_url = results[0]
217 if action_url is None:
222 except Exception: # pylint: disable=broad-except
223 raise WrongPage("Not an OpenID Consent Form Page")
225 referer = page.make_referer()
226 headers = {'referer': referer}
229 for i in range(0, len(names)):
230 payload[names[i]] = values[i]
232 # Replace known values
233 payload['decided_allow'] = 'Allow'
235 return [method, self.new_url(referer, action_url),
236 {'headers': headers, 'data': payload}]
238 def fetch_page(self, idp, target_url, follow_redirect=True, krb=False):
240 Fetch a page and parse the response code to determine what to do
243 The login process consists of redirections (302/303) and
244 potentially an unauthorized (401). For the case of unauthorized
245 try the page returned in case of fallback authentication.
252 r = self.access(action, url, krb=krb, **args)
253 if r.status_code == 303 or r.status_code == 302:
254 if not follow_redirect:
256 url = r.headers['location']
259 elif r.status_code == 401:
261 if r.headers.get('WWW-Authenticate', None) is None:
264 # Fall back, hopefully to testauth authentication.
266 (action, url, args) = self.handle_login_form(idp, page)
270 elif r.status_code == 200:
274 (action, url, args) = self.handle_login_form(idp, page)
280 (action, url, args) = self.handle_return_form(page)
286 (action, url, args) = self.handle_openid_consent_form(page)
292 (action, url, args) = self.handle_openid_form(page)
297 # Either we got what we wanted, or we have to stop anyway
300 raise ValueError("Unhandled status (%d) on url %s" % (
303 def auth_to_idp(self, idp, krb=False, rule=None, expected=None):
305 srv = self.servers[idp]
306 target_url = '%s/%s/' % (srv['baseuri'], idp)
308 r = self.access('get', target_url, krb=krb)
309 if r.status_code != 200:
310 raise ValueError("Access to idp failed: %s" % repr(r))
313 page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')
314 href = page.first_value('//div[@id="content"]/p/a/@href')
315 url = self.new_url(target_url, href)
317 page = self.fetch_page(idp, url, krb=krb)
320 rule = '//div[@id="welcome"]/p/text()'
322 expected = 'Welcome %s!' % srv['user']
324 page.expected_value(rule, expected)
326 def logout_from_idp(self, idp):
328 srv = self.servers[idp]
329 target_url = '%s/%s/logout' % (srv['baseuri'], idp)
331 r = self.access('get', target_url)
332 if r.status_code != 200:
333 raise ValueError("Logout from idp failed: %s" % repr(r))
335 def get_sp_metadata(self, idp, sp):
336 idpsrv = self.servers[idp]
337 idpuri = idpsrv['baseuri']
339 spuri = self.servers[sp]['baseuri']
341 return (idpuri, requests.get('%s/saml2/metadata' % spuri))
343 def add_sp_metadata(self, idp, sp, rest=False):
344 expected_status = 200
345 (idpuri, m) = self.get_sp_metadata(idp, sp)
346 url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
347 headers = {'referer': url}
349 expected_status = 201
351 'metadata': m.content,
355 'splink': 'http://test.example.com/secret/',
357 headers['content-type'] = 'application/x-www-form-urlencoded'
358 url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp)
359 r = self.post(url, headers=headers, data=urlencode(payload))
361 metafile = {'metafile': m.content}
362 payload = {'name': sp}
363 r = self.post(url, headers=headers, data=payload, files=metafile)
364 if r.status_code != expected_status:
365 raise ValueError('Failed to post SP data [%s]' % repr(r))
369 page.expected_value('//div[@class="alert alert-success"]/p/text()',
370 'SP Successfully added')
372 def set_sp_default_nameids(self, idp, sp, nameids):
374 nameids is a list of Name ID formats to enable
376 idpsrv = self.servers[idp]
377 idpuri = idpsrv['baseuri']
378 url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (idpuri, idp, sp)
379 headers = {'referer': url}
380 headers['content-type'] = 'application/x-www-form-urlencoded'
381 payload = {'submit': 'Submit',
382 'allowed_nameids': ', '.join(nameids)}
383 r = idpsrv['session'].post(url, headers=headers,
385 if r.status_code != 200:
386 raise ValueError('Failed to post SP data [%s]' % repr(r))
388 # pylint: disable=dangerous-default-value
389 def set_attributes_and_mapping(self, idp, mapping=[], attrs=[],
392 Set allowed attributes and mapping in the IDP or the SP. In the
393 case of the SP both allowed attributes and the mapping need to
394 be provided. An empty option for either means delete all values.
396 mapping is a list of list of rules of the form:
397 [['from-1', 'to-1'], ['from-2', 'from-2']]
399 ex. [['*', '*'], ['fullname', 'namefull']]
401 attrs is the list of attributes that will be allowed:
402 ['fullname', 'givenname', 'surname']
404 idpsrv = self.servers[idp]
405 idpuri = idpsrv['baseuri']
406 if spname: # per-SP setting
407 url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (
409 mapname = 'Attribute Mapping'
410 attrname = 'Allowed Attributes'
411 else: # global default
412 url = '%s/%s/admin/providers/saml2' % (idpuri, idp)
413 mapname = 'default attribute mapping'
414 attrname = 'default allowed attributes'
416 headers = {'referer': url}
417 headers['content-type'] = 'application/x-www-form-urlencoded'
418 payload = {'submit': 'Submit'}
421 payload['%s %s-from' % (mapname, count)] = m[0]
422 payload['%s %s-to' % (mapname, count)] = m[1]
426 payload['%s %s-name' % (attrname, count)] = attr
428 r = idpsrv['session'].post(url, headers=headers,
430 if r.status_code != 200:
431 raise ValueError('Failed to post IDP data [%s]' % repr(r))
433 def fetch_rest_page(self, idpname, uri):
435 idpname - the name of the IDP to fetch the page from
436 uri - the URI of the page to retrieve
438 The URL for the request is built from known-information in
441 returns dict if successful
442 returns ValueError if the output is unparseable
444 baseurl = self.servers[idpname].get('baseuri')
445 page = self.fetch_page(
447 '%s%s' % (baseurl, uri)
449 return json.loads(page.text)
451 def get_rest_sp(self, idpname, spname=None):
453 uri = '/%s/rest/providers/saml2/SPS/' % idpname
455 uri = '/%s/rest/providers/saml2/SPS/%s' % (idpname, spname)
457 return self.fetch_rest_page(idpname, uri)