3 # Copyright (C) 2014 Simo Sorce <simo@redhat.com>
5 # see file 'COPYING' for use and warranty information
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
26 from urllib import urlencode
27 from requests_kerberos import HTTPKerberosAuth, OPTIONAL
30 class WrongPage(Exception):
34 class PageTree(object):
36 def __init__(self, result):
38 self.text = result.text
43 if self._tree is None:
44 self._tree = html.fromstring(self.text)
47 def first_value(self, rule):
48 result = self.tree.xpath(rule)
49 if type(result) is list:
56 def all_values(self, rule):
57 result = self.tree.xpath(rule)
58 if type(result) is list:
62 def make_referer(self):
63 return self.result.url
65 def expected_value(self, rule, expected):
66 value = self.first_value(rule)
68 raise ValueError("Expected [%s], got [%s]" % (expected, value))
71 class HttpSessions(object):
76 def add_server(self, name, baseuri, user=None, pwd=None):
77 new = {'baseuri': baseuri,
78 'session': requests.Session()}
83 self.servers[name] = new
85 def get_session(self, url):
86 for srv in self.servers:
88 if url.startswith(d['baseuri']):
91 raise ValueError("Unknown URL: %s" % url)
93 def get(self, url, krb=False, **kwargs):
94 session = self.get_session(url)
95 allow_redirects = False
97 # In at least the test instance we don't get back a negotiate
98 # blob to do mutual authentication against.
99 kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
100 kwargs['auth'] = kerberos_auth
101 allow_redirects = True
102 return session.get(url, allow_redirects=allow_redirects, **kwargs)
104 def post(self, url, **kwargs):
105 session = self.get_session(url)
106 return session.post(url, allow_redirects=False, **kwargs)
108 def access(self, action, url, krb=False, **kwargs):
109 action = string.lower(action)
111 return self.get(url, krb, **kwargs)
112 elif action == 'post':
113 return self.post(url, **kwargs)
115 raise ValueError("Unknown action type: [%s]" % action)
117 def new_url(self, referer, action):
118 if action.startswith('/'):
119 u = urlparse.urlparse(referer)
120 return '%s://%s%s' % (u.scheme, u.netloc, action)
123 def get_form_data(self, page, form_id, input_fields):
124 form_selector = '//form'
126 form_selector += '[@id="%s"]' % form_id
128 action = page.first_value('%s/@action' % form_selector)
129 values.append(action)
130 method = page.first_value('%s/@method' % form_selector)
131 values.append(method)
132 for field in input_fields:
133 value = page.all_values('%s/input/@%s' % (form_selector,
138 def handle_login_form(self, idp, page):
139 if type(page) != PageTree:
140 raise TypeError("Expected PageTree object")
142 srv = self.servers[idp]
145 results = self.get_form_data(page, "login_form", ["name", "value"])
146 action_url = results[0]
150 if action_url is None:
152 except Exception: # pylint: disable=broad-except
153 raise WrongPage("Not a Login Form Page")
155 referer = page.make_referer()
156 headers = {'referer': referer}
158 for i in range(0, len(names)):
159 payload[names[i]] = values[i]
161 # replace known values
162 payload['login_name'] = srv['user']
163 payload['login_password'] = srv['pwd']
165 return [method, self.new_url(referer, action_url),
166 {'headers': headers, 'data': payload}]
168 def handle_return_form(self, page):
169 if type(page) != PageTree:
170 raise TypeError("Expected PageTree object")
173 results = self.get_form_data(page, "saml-response",
175 action_url = results[0]
176 if action_url is None:
181 except Exception: # pylint: disable=broad-except
182 raise WrongPage("Not a Return Form Page")
184 referer = page.make_referer()
185 headers = {'referer': referer}
188 for i in range(0, len(names)):
189 payload[names[i]] = values[i]
191 return [method, self.new_url(referer, action_url),
192 {'headers': headers, 'data': payload}]
194 def handle_openid_form(self, page):
195 if type(page) != PageTree:
196 raise TypeError("Expected PageTree object")
198 if not page.first_value('//title/text()') == \
199 'OpenID transaction in progress':
200 raise WrongPage('Not OpenID autosubmit form')
203 results = self.get_form_data(page, None,
205 action_url = results[0]
206 if action_url is None:
211 except Exception: # pylint: disable=broad-except
212 raise WrongPage("Not OpenID autosubmit form")
214 referer = page.make_referer()
215 headers = {'referer': referer}
218 for i in range(0, len(names)):
219 payload[names[i]] = values[i]
221 return [method, self.new_url(referer, action_url),
222 {'headers': headers, 'data': payload}]
224 def handle_openid_consent_form(self, page):
225 if type(page) != PageTree:
226 raise TypeError("Expected PageTree object")
229 results = self.get_form_data(page, "consent_form",
231 action_url = results[0]
232 if action_url is None:
237 except Exception: # pylint: disable=broad-except
238 raise WrongPage("Not an OpenID Consent Form Page")
240 referer = page.make_referer()
241 headers = {'referer': referer}
244 for i in range(0, len(names)):
245 payload[names[i]] = values[i]
247 # Replace known values
248 payload['decided_allow'] = 'Allow'
250 return [method, self.new_url(referer, action_url),
251 {'headers': headers, 'data': payload}]
253 def fetch_page(self, idp, target_url, follow_redirect=True, krb=False):
255 Fetch a page and parse the response code to determine what to do
258 The login process consists of redirections (302/303) and
259 potentially an unauthorized (401). For the case of unauthorized
260 try the page returned in case of fallback authentication.
267 # pylint: disable=star-args
268 r = self.access(action, url, krb=krb, **args)
269 if r.status_code == 303 or r.status_code == 302:
270 if not follow_redirect:
272 url = r.headers['location']
275 elif r.status_code == 401:
277 if r.headers.get('WWW-Authenticate', None) is None:
280 # Fall back, hopefully to testauth authentication.
282 (action, url, args) = self.handle_login_form(idp, page)
286 elif r.status_code == 200:
290 (action, url, args) = self.handle_login_form(idp, page)
296 (action, url, args) = self.handle_return_form(page)
302 (action, url, args) = self.handle_openid_consent_form(page)
308 (action, url, args) = self.handle_openid_form(page)
313 # Either we got what we wanted, or we have to stop anyway
316 raise ValueError("Unhandled status (%d) on url %s" % (
319 def auth_to_idp(self, idp, krb=False):
321 srv = self.servers[idp]
322 target_url = '%s/%s/' % (srv['baseuri'], idp)
324 r = self.access('get', target_url, krb=krb)
325 if r.status_code != 200:
326 raise ValueError("Access to idp failed: %s" % repr(r))
329 page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')
330 href = page.first_value('//div[@id="content"]/p/a/@href')
331 url = self.new_url(target_url, href)
333 page = self.fetch_page(idp, url, krb=krb)
335 page.expected_value('//div[@id="welcome"]/p/text()',
336 'Welcome %s!' % srv['user'])
338 def logout_from_idp(self, idp):
340 srv = self.servers[idp]
341 target_url = '%s/%s/logout' % (srv['baseuri'], idp)
343 r = self.access('get', target_url)
344 if r.status_code != 200:
345 raise ValueError("Logout from idp failed: %s" % repr(r))
347 def get_sp_metadata(self, idp, sp):
348 idpsrv = self.servers[idp]
349 idpuri = idpsrv['baseuri']
351 spuri = self.servers[sp]['baseuri']
353 return (idpuri, requests.get('%s/saml2/metadata' % spuri))
355 def add_sp_metadata(self, idp, sp, rest=False):
356 expected_status = 200
357 (idpuri, m) = self.get_sp_metadata(idp, sp)
358 url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
359 headers = {'referer': url}
361 expected_status = 201
362 payload = {'metadata': m.content}
363 headers['content-type'] = 'application/x-www-form-urlencoded'
364 url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp)
365 r = self.post(url, headers=headers, data=urlencode(payload))
367 metafile = {'metafile': m.content}
368 payload = {'name': sp}
369 r = self.post(url, headers=headers, data=payload, files=metafile)
370 if r.status_code != expected_status:
371 raise ValueError('Failed to post SP data [%s]' % repr(r))
375 page.expected_value('//div[@class="alert alert-success"]/p/text()',
376 'SP Successfully added')
378 def set_sp_default_nameids(self, idp, sp, nameids):
380 nameids is a list of Name ID formats to enable
382 idpsrv = self.servers[idp]
383 idpuri = idpsrv['baseuri']
384 url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (idpuri, idp, sp)
385 headers = {'referer': url}
386 headers['content-type'] = 'application/x-www-form-urlencoded'
387 payload = {'submit': 'Submit',
388 'allowed_nameids': ', '.join(nameids)}
389 r = idpsrv['session'].post(url, headers=headers,
391 if r.status_code != 200:
392 raise ValueError('Failed to post SP data [%s]' % repr(r))
394 # pylint: disable=dangerous-default-value
395 def set_attributes_and_mapping(self, idp, mapping=[], attrs=[],
398 Set allowed attributes and mapping in the IDP or the SP. In the
399 case of the SP both allowed attributes and the mapping need to
400 be provided. An empty option for either means delete all values.
402 mapping is a list of list of rules of the form:
403 [['from-1', 'to-1'], ['from-2', 'from-2']]
405 ex. [['*', '*'], ['fullname', 'namefull']]
407 attrs is the list of attributes that will be allowed:
408 ['fullname', 'givenname', 'surname']
410 idpsrv = self.servers[idp]
411 idpuri = idpsrv['baseuri']
412 if spname: # per-SP setting
413 url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (
415 mapname = 'Attribute Mapping'
416 attrname = 'Allowed Attributes'
417 else: # global default
418 url = '%s/%s/admin/providers/saml2' % (idpuri, idp)
419 mapname = 'default attribute mapping'
420 attrname = 'default allowed attributes'
422 headers = {'referer': url}
423 headers['content-type'] = 'application/x-www-form-urlencoded'
424 payload = {'submit': 'Submit'}
427 payload['%s %s-from' % (mapname, count)] = m[0]
428 payload['%s %s-to' % (mapname, count)] = m[1]
432 payload['%s %s-name' % (attrname, count)] = attr
434 r = idpsrv['session'].post(url, headers=headers,
436 if r.status_code != 200:
437 raise ValueError('Failed to post IDP data [%s]' % repr(r))
439 def fetch_rest_page(self, idpname, uri):
441 idpname - the name of the IDP to fetch the page from
442 uri - the URI of the page to retrieve
444 The URL for the request is built from known-information in
447 returns dict if successful
448 returns ValueError if the output is unparseable
450 baseurl = self.servers[idpname].get('baseuri')
451 page = self.fetch_page(
453 '%s%s' % (baseurl, uri)
455 return json.loads(page.text)
457 def get_rest_sp(self, idpname, spname=None):
459 uri = '/%s/rest/providers/saml2/SPS/' % idpname
461 uri = '/%s/rest/providers/saml2/SPS/%s' % (idpname, spname)
463 return self.fetch_rest_page(idpname, uri)