0643fb16817086b8eafdb2964d9775df9d9d6781
[cascardo/ipsilon.git] / tests / helpers / http.py
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING
4
5 from lxml import html
6 import requests
7 import string
8 import urlparse
9 import json
10 from urllib import urlencode
11 from requests_kerberos import HTTPKerberosAuth, OPTIONAL
12
13
14 class WrongPage(Exception):
15     pass
16
17
18 class PageTree(object):
19
20     def __init__(self, result):
21         self.result = result
22         self.text = result.text
23         self._tree = None
24
25     @property
26     def tree(self):
27         if self._tree is None:
28             self._tree = html.fromstring(self.text)
29         return self._tree
30
31     def first_value(self, rule):
32         result = self.tree.xpath(rule)
33         if isinstance(result, list):
34             if len(result) > 0:
35                 result = result[0]
36             else:
37                 result = None
38         return result
39
40     def all_values(self, rule):
41         result = self.tree.xpath(rule)
42         if isinstance(result, list):
43             return result
44         return [result]
45
46     def make_referer(self):
47         return self.result.url
48
49     def expected_value(self, rule, expected):
50         value = self.first_value(rule)
51         if value != expected:
52             raise ValueError("Expected [%s], got [%s]" % (expected, value))
53
54
55 class HttpSessions(object):
56
57     def __init__(self):
58         self.servers = dict()
59
60     def add_server(self, name, baseuri, user=None, pwd=None):
61         new = {'baseuri': baseuri,
62                'session': requests.Session()}
63         if user:
64             new['user'] = user
65         if pwd:
66             new['pwd'] = pwd
67         self.servers[name] = new
68
69     def get_session(self, url):
70         for srv in self.servers:
71             d = self.servers[srv]
72             if url.startswith(d['baseuri']):
73                 return d['session']
74
75         raise ValueError("Unknown URL: %s" % url)
76
77     def get(self, url, krb=False, **kwargs):
78         session = self.get_session(url)
79         allow_redirects = False
80         if krb:
81             # python-requests-kerberos isn't too bright about doing mutual
82             # authentication and it tries to do it on any non-401 response
83             # which doesn't work in our case since we follow redirects.
84             kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
85             kwargs['auth'] = kerberos_auth
86             allow_redirects = True
87         return session.get(url, allow_redirects=allow_redirects, **kwargs)
88
89     def post(self, url, **kwargs):
90         session = self.get_session(url)
91         return session.post(url, allow_redirects=False, **kwargs)
92
93     def access(self, action, url, krb=False, **kwargs):
94         action = string.lower(action)
95         if action == 'get':
96             return self.get(url, krb, **kwargs)
97         elif action == 'post':
98             return self.post(url, **kwargs)
99         else:
100             raise ValueError("Unknown action type: [%s]" % action)
101
102     def new_url(self, referer, action):
103         if action.startswith('/'):
104             u = urlparse.urlparse(referer)
105             return '%s://%s%s' % (u.scheme, u.netloc, action)
106         return action
107
108     def get_form_data(self, page, form_id, input_fields):
109         form_selector = '//form'
110         if form_id:
111             form_selector += '[@id="%s"]' % form_id
112         values = []
113         action = page.first_value('%s/@action' % form_selector)
114         values.append(action)
115         method = page.first_value('%s/@method' % form_selector)
116         values.append(method)
117         for field in input_fields:
118             value = page.all_values('%s/input/@%s' % (form_selector,
119                                                       field))
120             values.append(value)
121         return values
122
123     def handle_login_form(self, idp, page):
124         if not isinstance(page, PageTree):
125             raise TypeError("Expected PageTree object")
126
127         srv = self.servers[idp]
128
129         try:
130             results = self.get_form_data(page, "login_form", ["name", "value"])
131             action_url = results[0]
132             method = results[1]
133             names = results[2]
134             values = results[3]
135             if action_url is None:
136                 raise Exception
137         except Exception:  # pylint: disable=broad-except
138             raise WrongPage("Not a Login Form Page")
139
140         referer = page.make_referer()
141         headers = {'referer': referer}
142         payload = {}
143         for i in range(0, len(names)):
144             payload[names[i]] = values[i]
145
146         # replace known values
147         payload['login_name'] = srv['user']
148         payload['login_password'] = srv['pwd']
149
150         return [method, self.new_url(referer, action_url),
151                 {'headers': headers, 'data': payload}]
152
153     def handle_return_form(self, page):
154         if not isinstance(page, PageTree):
155             raise TypeError("Expected PageTree object")
156
157         try:
158             results = self.get_form_data(page, "saml-response",
159                                          ["name", "value"])
160             action_url = results[0]
161             if action_url is None:
162                 raise Exception
163             method = results[1]
164             names = results[2]
165             values = results[3]
166         except Exception:  # pylint: disable=broad-except
167             raise WrongPage("Not a Return Form Page")
168
169         referer = page.make_referer()
170         headers = {'referer': referer}
171
172         payload = {}
173         for i in range(0, len(names)):
174             payload[names[i]] = values[i]
175
176         return [method, self.new_url(referer, action_url),
177                 {'headers': headers, 'data': payload}]
178
179     def handle_openid_form(self, page):
180         if not isinstance(page, PageTree):
181             raise TypeError("Expected PageTree object")
182
183         if not page.first_value('//title/text()') == \
184                 'OpenID transaction in progress':
185             raise WrongPage('Not OpenID autosubmit form')
186
187         try:
188             results = self.get_form_data(page, None,
189                                          ["name", "value"])
190             action_url = results[0]
191             if action_url is None:
192                 raise Exception
193             method = results[1]
194             names = results[2]
195             values = results[3]
196         except Exception:  # pylint: disable=broad-except
197             raise WrongPage("Not OpenID autosubmit form")
198
199         referer = page.make_referer()
200         headers = {'referer': referer}
201
202         payload = {}
203         for i in range(0, len(names)):
204             payload[names[i]] = values[i]
205
206         return [method, self.new_url(referer, action_url),
207                 {'headers': headers, 'data': payload}]
208
209     def handle_openid_consent_form(self, page):
210         if not isinstance(page, PageTree):
211             raise TypeError("Expected PageTree object")
212
213         try:
214             results = self.get_form_data(page, "consent_form",
215                                          ['name', 'value'])
216             action_url = results[0]
217             if action_url is None:
218                 raise Exception
219             method = results[1]
220             names = results[2]
221             values = results[3]
222         except Exception:  # pylint: disable=broad-except
223             raise WrongPage("Not an OpenID Consent Form Page")
224
225         referer = page.make_referer()
226         headers = {'referer': referer}
227
228         payload = {}
229         for i in range(0, len(names)):
230             payload[names[i]] = values[i]
231
232         # Replace known values
233         payload['decided_allow'] = 'Allow'
234
235         return [method, self.new_url(referer, action_url),
236                 {'headers': headers, 'data': payload}]
237
238     def fetch_page(self, idp, target_url, follow_redirect=True, krb=False):
239         """
240         Fetch a page and parse the response code to determine what to do
241         next.
242
243         The login process consists of redirections (302/303) and
244         potentially an unauthorized (401). For the case of unauthorized
245         try the page returned in case of fallback authentication.
246         """
247         url = target_url
248         action = 'get'
249         args = {}
250
251         while True:
252             r = self.access(action, url, krb=krb, **args)
253             if r.status_code == 303 or r.status_code == 302:
254                 if not follow_redirect:
255                     return PageTree(r)
256                 url = r.headers['location']
257                 action = 'get'
258                 args = {}
259             elif r.status_code == 401:
260                 page = PageTree(r)
261                 if r.headers.get('WWW-Authenticate', None) is None:
262                     return page
263
264                 # Fall back, hopefully to testauth authentication.
265                 try:
266                     (action, url, args) = self.handle_login_form(idp, page)
267                     continue
268                 except WrongPage:
269                     pass
270             elif r.status_code == 200:
271                 page = PageTree(r)
272
273                 try:
274                     (action, url, args) = self.handle_login_form(idp, page)
275                     continue
276                 except WrongPage:
277                     pass
278
279                 try:
280                     (action, url, args) = self.handle_return_form(page)
281                     continue
282                 except WrongPage:
283                     pass
284
285                 try:
286                     (action, url, args) = self.handle_openid_consent_form(page)
287                     continue
288                 except WrongPage:
289                     pass
290
291                 try:
292                     (action, url, args) = self.handle_openid_form(page)
293                     continue
294                 except WrongPage:
295                     pass
296
297                 # Either we got what we wanted, or we have to stop anyway
298                 return page
299             else:
300                 raise ValueError("Unhandled status (%d) on url %s" % (
301                                  r.status_code, url))
302
303     def auth_to_idp(self, idp, krb=False):
304
305         srv = self.servers[idp]
306         target_url = '%s/%s/' % (srv['baseuri'], idp)
307
308         r = self.access('get', target_url, krb=krb)
309         if r.status_code != 200:
310             raise ValueError("Access to idp failed: %s" % repr(r))
311
312         page = PageTree(r)
313         page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')
314         href = page.first_value('//div[@id="content"]/p/a/@href')
315         url = self.new_url(target_url, href)
316
317         page = self.fetch_page(idp, url, krb=krb)
318
319         page.expected_value('//div[@id="welcome"]/p/text()',
320                             'Welcome %s!' % srv['user'])
321
322     def logout_from_idp(self, idp):
323
324         srv = self.servers[idp]
325         target_url = '%s/%s/logout' % (srv['baseuri'], idp)
326
327         r = self.access('get', target_url)
328         if r.status_code != 200:
329             raise ValueError("Logout from idp failed: %s" % repr(r))
330
331     def get_sp_metadata(self, idp, sp):
332         idpsrv = self.servers[idp]
333         idpuri = idpsrv['baseuri']
334
335         spuri = self.servers[sp]['baseuri']
336
337         return (idpuri, requests.get('%s/saml2/metadata' % spuri))
338
339     def add_sp_metadata(self, idp, sp, rest=False):
340         expected_status = 200
341         (idpuri, m) = self.get_sp_metadata(idp, sp)
342         url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
343         headers = {'referer': url}
344         if rest:
345             expected_status = 201
346             payload = {'metadata': m.content}
347             headers['content-type'] = 'application/x-www-form-urlencoded'
348             url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp)
349             r = self.post(url, headers=headers, data=urlencode(payload))
350         else:
351             metafile = {'metafile': m.content}
352             payload = {'name': sp}
353             r = self.post(url, headers=headers, data=payload, files=metafile)
354         if r.status_code != expected_status:
355             raise ValueError('Failed to post SP data [%s]' % repr(r))
356
357         if not rest:
358             page = PageTree(r)
359             page.expected_value('//div[@class="alert alert-success"]/p/text()',
360                                 'SP Successfully added')
361
362     def set_sp_default_nameids(self, idp, sp, nameids):
363         """
364         nameids is a list of Name ID formats to enable
365         """
366         idpsrv = self.servers[idp]
367         idpuri = idpsrv['baseuri']
368         url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (idpuri, idp, sp)
369         headers = {'referer': url}
370         headers['content-type'] = 'application/x-www-form-urlencoded'
371         payload = {'submit': 'Submit',
372                    'allowed_nameids': ', '.join(nameids)}
373         r = idpsrv['session'].post(url, headers=headers,
374                                    data=payload)
375         if r.status_code != 200:
376             raise ValueError('Failed to post SP data [%s]' % repr(r))
377
378     # pylint: disable=dangerous-default-value
379     def set_attributes_and_mapping(self, idp, mapping=[], attrs=[],
380                                    spname=None):
381         """
382         Set allowed attributes and mapping in the IDP or the SP. In the
383         case of the SP both allowed attributes and the mapping need to
384         be provided. An empty option for either means delete all values.
385
386         mapping is a list of list of rules of the form:
387            [['from-1', 'to-1'], ['from-2', 'from-2']]
388
389         ex. [['*', '*'], ['fullname', 'namefull']]
390
391         attrs is the list of attributes that will be allowed:
392            ['fullname', 'givenname', 'surname']
393         """
394         idpsrv = self.servers[idp]
395         idpuri = idpsrv['baseuri']
396         if spname:  # per-SP setting
397             url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (
398                 idpuri, idp, spname)
399             mapname = 'Attribute Mapping'
400             attrname = 'Allowed Attributes'
401         else:  # global default
402             url = '%s/%s/admin/providers/saml2' % (idpuri, idp)
403             mapname = 'default attribute mapping'
404             attrname = 'default allowed attributes'
405
406         headers = {'referer': url}
407         headers['content-type'] = 'application/x-www-form-urlencoded'
408         payload = {'submit': 'Submit'}
409         count = 0
410         for m in mapping:
411             payload['%s %s-from' % (mapname, count)] = m[0]
412             payload['%s %s-to' % (mapname, count)] = m[1]
413             count += 1
414         count = 0
415         for attr in attrs:
416             payload['%s %s-name' % (attrname, count)] = attr
417             count += 1
418         r = idpsrv['session'].post(url, headers=headers,
419                                    data=payload)
420         if r.status_code != 200:
421             raise ValueError('Failed to post IDP data [%s]' % repr(r))
422
423     def fetch_rest_page(self, idpname, uri):
424         """
425         idpname - the name of the IDP to fetch the page from
426         uri - the URI of the page to retrieve
427
428         The URL for the request is built from known-information in
429         the session.
430
431         returns dict if successful
432         returns ValueError if the output is unparseable
433         """
434         baseurl = self.servers[idpname].get('baseuri')
435         page = self.fetch_page(
436             idpname,
437             '%s%s' % (baseurl, uri)
438         )
439         return json.loads(page.text)
440
441     def get_rest_sp(self, idpname, spname=None):
442         if spname is None:
443             uri = '/%s/rest/providers/saml2/SPS/' % idpname
444         else:
445             uri = '/%s/rest/providers/saml2/SPS/%s' % (idpname, spname)
446
447         return self.fetch_rest_page(idpname, uri)