Pull the GSSAPI principal out of the userattrs
[cascardo/ipsilon.git] / tests / helpers / http.py
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2014  Simo Sorce <simo@redhat.com>
4 #
5 # see file 'COPYING' for use and warranty information
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19
20
21 from lxml import html
22 import requests
23 import string
24 import urlparse
25 import json
26 from urllib import urlencode
27 from requests_kerberos import HTTPKerberosAuth, OPTIONAL
28
29
30 class WrongPage(Exception):
31     pass
32
33
34 class PageTree(object):
35
36     def __init__(self, result):
37         self.result = result
38         self.text = result.text
39         self._tree = None
40
41     @property
42     def tree(self):
43         if self._tree is None:
44             self._tree = html.fromstring(self.text)
45         return self._tree
46
47     def first_value(self, rule):
48         result = self.tree.xpath(rule)
49         if type(result) is list:
50             if len(result) > 0:
51                 result = result[0]
52             else:
53                 result = None
54         return result
55
56     def all_values(self, rule):
57         result = self.tree.xpath(rule)
58         if type(result) is list:
59             return result
60         return [result]
61
62     def make_referer(self):
63         return self.result.url
64
65     def expected_value(self, rule, expected):
66         value = self.first_value(rule)
67         if value != expected:
68             raise ValueError("Expected [%s], got [%s]" % (expected, value))
69
70
71 class HttpSessions(object):
72
73     def __init__(self):
74         self.servers = dict()
75
76     def add_server(self, name, baseuri, user=None, pwd=None):
77         new = {'baseuri': baseuri,
78                'session': requests.Session()}
79         if user:
80             new['user'] = user
81         if pwd:
82             new['pwd'] = pwd
83         self.servers[name] = new
84
85     def get_session(self, url):
86         for srv in self.servers:
87             d = self.servers[srv]
88             if url.startswith(d['baseuri']):
89                 return d['session']
90
91         raise ValueError("Unknown URL: %s" % url)
92
93     def get(self, url, krb=False, **kwargs):
94         session = self.get_session(url)
95         allow_redirects = False
96         if krb:
97             # python-requests-kerberos isn't too bright about doing mutual
98             # authentication and it tries to do it on any non-401 response
99             # which doesn't work in our case since we follow redirects.
100             kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
101             kwargs['auth'] = kerberos_auth
102             allow_redirects = True
103         return session.get(url, allow_redirects=allow_redirects, **kwargs)
104
105     def post(self, url, **kwargs):
106         session = self.get_session(url)
107         return session.post(url, allow_redirects=False, **kwargs)
108
109     def access(self, action, url, krb=False, **kwargs):
110         action = string.lower(action)
111         if action == 'get':
112             return self.get(url, krb, **kwargs)
113         elif action == 'post':
114             return self.post(url, **kwargs)
115         else:
116             raise ValueError("Unknown action type: [%s]" % action)
117
118     def new_url(self, referer, action):
119         if action.startswith('/'):
120             u = urlparse.urlparse(referer)
121             return '%s://%s%s' % (u.scheme, u.netloc, action)
122         return action
123
124     def get_form_data(self, page, form_id, input_fields):
125         form_selector = '//form'
126         if form_id:
127             form_selector += '[@id="%s"]' % form_id
128         values = []
129         action = page.first_value('%s/@action' % form_selector)
130         values.append(action)
131         method = page.first_value('%s/@method' % form_selector)
132         values.append(method)
133         for field in input_fields:
134             value = page.all_values('%s/input/@%s' % (form_selector,
135                                                       field))
136             values.append(value)
137         return values
138
139     def handle_login_form(self, idp, page):
140         if type(page) != PageTree:
141             raise TypeError("Expected PageTree object")
142
143         srv = self.servers[idp]
144
145         try:
146             results = self.get_form_data(page, "login_form", ["name", "value"])
147             action_url = results[0]
148             method = results[1]
149             names = results[2]
150             values = results[3]
151             if action_url is None:
152                 raise Exception
153         except Exception:  # pylint: disable=broad-except
154             raise WrongPage("Not a Login Form Page")
155
156         referer = page.make_referer()
157         headers = {'referer': referer}
158         payload = {}
159         for i in range(0, len(names)):
160             payload[names[i]] = values[i]
161
162         # replace known values
163         payload['login_name'] = srv['user']
164         payload['login_password'] = srv['pwd']
165
166         return [method, self.new_url(referer, action_url),
167                 {'headers': headers, 'data': payload}]
168
169     def handle_return_form(self, page):
170         if type(page) != PageTree:
171             raise TypeError("Expected PageTree object")
172
173         try:
174             results = self.get_form_data(page, "saml-response",
175                                          ["name", "value"])
176             action_url = results[0]
177             if action_url is None:
178                 raise Exception
179             method = results[1]
180             names = results[2]
181             values = results[3]
182         except Exception:  # pylint: disable=broad-except
183             raise WrongPage("Not a Return Form Page")
184
185         referer = page.make_referer()
186         headers = {'referer': referer}
187
188         payload = {}
189         for i in range(0, len(names)):
190             payload[names[i]] = values[i]
191
192         return [method, self.new_url(referer, action_url),
193                 {'headers': headers, 'data': payload}]
194
195     def handle_openid_form(self, page):
196         if type(page) != PageTree:
197             raise TypeError("Expected PageTree object")
198
199         if not page.first_value('//title/text()') == \
200                 'OpenID transaction in progress':
201             raise WrongPage('Not OpenID autosubmit form')
202
203         try:
204             results = self.get_form_data(page, None,
205                                          ["name", "value"])
206             action_url = results[0]
207             if action_url is None:
208                 raise Exception
209             method = results[1]
210             names = results[2]
211             values = results[3]
212         except Exception:  # pylint: disable=broad-except
213             raise WrongPage("Not OpenID autosubmit form")
214
215         referer = page.make_referer()
216         headers = {'referer': referer}
217
218         payload = {}
219         for i in range(0, len(names)):
220             payload[names[i]] = values[i]
221
222         return [method, self.new_url(referer, action_url),
223                 {'headers': headers, 'data': payload}]
224
225     def handle_openid_consent_form(self, page):
226         if type(page) != PageTree:
227             raise TypeError("Expected PageTree object")
228
229         try:
230             results = self.get_form_data(page, "consent_form",
231                                          ['name', 'value'])
232             action_url = results[0]
233             if action_url is None:
234                 raise Exception
235             method = results[1]
236             names = results[2]
237             values = results[3]
238         except Exception:  # pylint: disable=broad-except
239             raise WrongPage("Not an OpenID Consent Form Page")
240
241         referer = page.make_referer()
242         headers = {'referer': referer}
243
244         payload = {}
245         for i in range(0, len(names)):
246             payload[names[i]] = values[i]
247
248         # Replace known values
249         payload['decided_allow'] = 'Allow'
250
251         return [method, self.new_url(referer, action_url),
252                 {'headers': headers, 'data': payload}]
253
254     def fetch_page(self, idp, target_url, follow_redirect=True, krb=False):
255         """
256         Fetch a page and parse the response code to determine what to do
257         next.
258
259         The login process consists of redirections (302/303) and
260         potentially an unauthorized (401). For the case of unauthorized
261         try the page returned in case of fallback authentication.
262         """
263         url = target_url
264         action = 'get'
265         args = {}
266
267         while True:
268             # pylint: disable=star-args
269             r = self.access(action, url, krb=krb, **args)
270             if r.status_code == 303 or r.status_code == 302:
271                 if not follow_redirect:
272                     return PageTree(r)
273                 url = r.headers['location']
274                 action = 'get'
275                 args = {}
276             elif r.status_code == 401:
277                 page = PageTree(r)
278                 if r.headers.get('WWW-Authenticate', None) is None:
279                     return page
280
281                 # Fall back, hopefully to testauth authentication.
282                 try:
283                     (action, url, args) = self.handle_login_form(idp, page)
284                     continue
285                 except WrongPage:
286                     pass
287             elif r.status_code == 200:
288                 page = PageTree(r)
289
290                 try:
291                     (action, url, args) = self.handle_login_form(idp, page)
292                     continue
293                 except WrongPage:
294                     pass
295
296                 try:
297                     (action, url, args) = self.handle_return_form(page)
298                     continue
299                 except WrongPage:
300                     pass
301
302                 try:
303                     (action, url, args) = self.handle_openid_consent_form(page)
304                     continue
305                 except WrongPage:
306                     pass
307
308                 try:
309                     (action, url, args) = self.handle_openid_form(page)
310                     continue
311                 except WrongPage:
312                     pass
313
314                 # Either we got what we wanted, or we have to stop anyway
315                 return page
316             else:
317                 raise ValueError("Unhandled status (%d) on url %s" % (
318                                  r.status_code, url))
319
320     def auth_to_idp(self, idp, krb=False):
321
322         srv = self.servers[idp]
323         target_url = '%s/%s/' % (srv['baseuri'], idp)
324
325         r = self.access('get', target_url, krb=krb)
326         if r.status_code != 200:
327             raise ValueError("Access to idp failed: %s" % repr(r))
328
329         page = PageTree(r)
330         page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')
331         href = page.first_value('//div[@id="content"]/p/a/@href')
332         url = self.new_url(target_url, href)
333
334         page = self.fetch_page(idp, url, krb=krb)
335
336         page.expected_value('//div[@id="welcome"]/p/text()',
337                             'Welcome %s!' % srv['user'])
338
339     def logout_from_idp(self, idp):
340
341         srv = self.servers[idp]
342         target_url = '%s/%s/logout' % (srv['baseuri'], idp)
343
344         r = self.access('get', target_url)
345         if r.status_code != 200:
346             raise ValueError("Logout from idp failed: %s" % repr(r))
347
348     def get_sp_metadata(self, idp, sp):
349         idpsrv = self.servers[idp]
350         idpuri = idpsrv['baseuri']
351
352         spuri = self.servers[sp]['baseuri']
353
354         return (idpuri, requests.get('%s/saml2/metadata' % spuri))
355
356     def add_sp_metadata(self, idp, sp, rest=False):
357         expected_status = 200
358         (idpuri, m) = self.get_sp_metadata(idp, sp)
359         url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
360         headers = {'referer': url}
361         if rest:
362             expected_status = 201
363             payload = {'metadata': m.content}
364             headers['content-type'] = 'application/x-www-form-urlencoded'
365             url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp)
366             r = self.post(url, headers=headers, data=urlencode(payload))
367         else:
368             metafile = {'metafile': m.content}
369             payload = {'name': sp}
370             r = self.post(url, headers=headers, data=payload, files=metafile)
371         if r.status_code != expected_status:
372             raise ValueError('Failed to post SP data [%s]' % repr(r))
373
374         if not rest:
375             page = PageTree(r)
376             page.expected_value('//div[@class="alert alert-success"]/p/text()',
377                                 'SP Successfully added')
378
379     def set_sp_default_nameids(self, idp, sp, nameids):
380         """
381         nameids is a list of Name ID formats to enable
382         """
383         idpsrv = self.servers[idp]
384         idpuri = idpsrv['baseuri']
385         url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (idpuri, idp, sp)
386         headers = {'referer': url}
387         headers['content-type'] = 'application/x-www-form-urlencoded'
388         payload = {'submit': 'Submit',
389                    'allowed_nameids': ', '.join(nameids)}
390         r = idpsrv['session'].post(url, headers=headers,
391                                    data=payload)
392         if r.status_code != 200:
393             raise ValueError('Failed to post SP data [%s]' % repr(r))
394
395     # pylint: disable=dangerous-default-value
396     def set_attributes_and_mapping(self, idp, mapping=[], attrs=[],
397                                    spname=None):
398         """
399         Set allowed attributes and mapping in the IDP or the SP. In the
400         case of the SP both allowed attributes and the mapping need to
401         be provided. An empty option for either means delete all values.
402
403         mapping is a list of list of rules of the form:
404            [['from-1', 'to-1'], ['from-2', 'from-2']]
405
406         ex. [['*', '*'], ['fullname', 'namefull']]
407
408         attrs is the list of attributes that will be allowed:
409            ['fullname', 'givenname', 'surname']
410         """
411         idpsrv = self.servers[idp]
412         idpuri = idpsrv['baseuri']
413         if spname:  # per-SP setting
414             url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (
415                 idpuri, idp, spname)
416             mapname = 'Attribute Mapping'
417             attrname = 'Allowed Attributes'
418         else:  # global default
419             url = '%s/%s/admin/providers/saml2' % (idpuri, idp)
420             mapname = 'default attribute mapping'
421             attrname = 'default allowed attributes'
422
423         headers = {'referer': url}
424         headers['content-type'] = 'application/x-www-form-urlencoded'
425         payload = {'submit': 'Submit'}
426         count = 0
427         for m in mapping:
428             payload['%s %s-from' % (mapname, count)] = m[0]
429             payload['%s %s-to' % (mapname, count)] = m[1]
430             count += 1
431         count = 0
432         for attr in attrs:
433             payload['%s %s-name' % (attrname, count)] = attr
434             count += 1
435         r = idpsrv['session'].post(url, headers=headers,
436                                    data=payload)
437         if r.status_code != 200:
438             raise ValueError('Failed to post IDP data [%s]' % repr(r))
439
440     def fetch_rest_page(self, idpname, uri):
441         """
442         idpname - the name of the IDP to fetch the page from
443         uri - the URI of the page to retrieve
444
445         The URL for the request is built from known-information in
446         the session.
447
448         returns dict if successful
449         returns ValueError if the output is unparseable
450         """
451         baseurl = self.servers[idpname].get('baseuri')
452         page = self.fetch_page(
453             idpname,
454             '%s%s' % (baseurl, uri)
455         )
456         return json.loads(page.text)
457
458     def get_rest_sp(self, idpname, spname=None):
459         if spname is None:
460             uri = '/%s/rest/providers/saml2/SPS/' % idpname
461         else:
462             uri = '/%s/rest/providers/saml2/SPS/%s' % (idpname, spname)
463
464         return self.fetch_rest_page(idpname, uri)