pam: use a pam object method instead of pam module function
[cascardo/ipsilon.git] / tests / helpers / http.py
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING
4
5 from lxml import html
6 import requests
7 import string
8 import urlparse
9 import json
10 from urllib import urlencode
11 from requests_kerberos import HTTPKerberosAuth, OPTIONAL
12
13
14 class WrongPage(Exception):
15     pass
16
17
18 class PageTree(object):
19
20     def __init__(self, result):
21         self.result = result
22         self.text = result.text
23         self._tree = None
24
25     @property
26     def tree(self):
27         if self._tree is None:
28             self._tree = html.fromstring(self.text)
29         return self._tree
30
31     def first_value(self, rule):
32         result = self.tree.xpath(rule)
33         if isinstance(result, list):
34             if len(result) > 0:
35                 result = result[0]
36             else:
37                 result = None
38         return result
39
40     def all_values(self, rule):
41         result = self.tree.xpath(rule)
42         if isinstance(result, list):
43             return result
44         return [result]
45
46     def make_referer(self):
47         return self.result.url
48
49     def expected_value(self, rule, expected):
50         value = self.first_value(rule)
51         if value != expected:
52             raise ValueError("Expected [%s], got [%s]" % (expected, value))
53
54
55 class HttpSessions(object):
56
57     def __init__(self):
58         self.servers = dict()
59
60     def add_server(self, name, baseuri, user=None, pwd=None):
61         new = {'baseuri': baseuri,
62                'session': requests.Session()}
63         if user:
64             new['user'] = user
65         if pwd:
66             new['pwd'] = pwd
67         self.servers[name] = new
68
69     def get_session(self, url):
70         for srv in self.servers:
71             d = self.servers[srv]
72             if url.startswith(d['baseuri']):
73                 return d['session']
74
75         raise ValueError("Unknown URL: %s" % url)
76
77     def get(self, url, krb=False, **kwargs):
78         session = self.get_session(url)
79         allow_redirects = False
80         if krb:
81             # python-requests-kerberos isn't too bright about doing mutual
82             # authentication and it tries to do it on any non-401 response
83             # which doesn't work in our case since we follow redirects.
84             kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
85             kwargs['auth'] = kerberos_auth
86             allow_redirects = True
87         return session.get(url, allow_redirects=allow_redirects, **kwargs)
88
89     def post(self, url, **kwargs):
90         session = self.get_session(url)
91         return session.post(url, allow_redirects=False, **kwargs)
92
93     def access(self, action, url, krb=False, **kwargs):
94         action = string.lower(action)
95         if action == 'get':
96             return self.get(url, krb, **kwargs)
97         elif action == 'post':
98             return self.post(url, **kwargs)
99         else:
100             raise ValueError("Unknown action type: [%s]" % action)
101
102     def new_url(self, referer, action):
103         if action.startswith('/'):
104             u = urlparse.urlparse(referer)
105             return '%s://%s%s' % (u.scheme, u.netloc, action)
106         return action
107
108     def get_form_data(self, page, form_id, input_fields):
109         form_selector = '//form'
110         if form_id:
111             form_selector += '[@id="%s"]' % form_id
112         values = []
113         action = page.first_value('%s/@action' % form_selector)
114         values.append(action)
115         method = page.first_value('%s/@method' % form_selector)
116         values.append(method)
117         for field in input_fields:
118             value = page.all_values('%s/input/@%s' % (form_selector,
119                                                       field))
120             values.append(value)
121         return values
122
123     def handle_login_form(self, idp, page):
124         if not isinstance(page, PageTree):
125             raise TypeError("Expected PageTree object")
126
127         srv = self.servers[idp]
128
129         try:
130             results = self.get_form_data(page, "login_form", ["name", "value"])
131             action_url = results[0]
132             method = results[1]
133             names = results[2]
134             values = results[3]
135             if action_url is None:
136                 raise Exception
137         except Exception:  # pylint: disable=broad-except
138             raise WrongPage("Not a Login Form Page")
139
140         referer = page.make_referer()
141         headers = {'referer': referer}
142         payload = {}
143         for i in range(0, len(names)):
144             payload[names[i]] = values[i]
145
146         # replace known values
147         payload['login_name'] = srv['user']
148         payload['login_password'] = srv['pwd']
149
150         return [method, self.new_url(referer, action_url),
151                 {'headers': headers, 'data': payload}]
152
153     def handle_return_form(self, page):
154         if not isinstance(page, PageTree):
155             raise TypeError("Expected PageTree object")
156
157         try:
158             results = self.get_form_data(page, "saml-response",
159                                          ["name", "value"])
160             action_url = results[0]
161             if action_url is None:
162                 raise Exception
163             method = results[1]
164             names = results[2]
165             values = results[3]
166         except Exception:  # pylint: disable=broad-except
167             raise WrongPage("Not a Return Form Page")
168
169         referer = page.make_referer()
170         headers = {'referer': referer}
171
172         payload = {}
173         for i in range(0, len(names)):
174             payload[names[i]] = values[i]
175
176         return [method, self.new_url(referer, action_url),
177                 {'headers': headers, 'data': payload}]
178
179     def handle_openid_form(self, page):
180         if not isinstance(page, PageTree):
181             raise TypeError("Expected PageTree object")
182
183         if not page.first_value('//title/text()') == \
184                 'OpenID transaction in progress':
185             raise WrongPage('Not OpenID autosubmit form')
186
187         try:
188             results = self.get_form_data(page, None,
189                                          ["name", "value"])
190             action_url = results[0]
191             if action_url is None:
192                 raise Exception
193             method = results[1]
194             names = results[2]
195             values = results[3]
196         except Exception:  # pylint: disable=broad-except
197             raise WrongPage("Not OpenID autosubmit form")
198
199         referer = page.make_referer()
200         headers = {'referer': referer}
201
202         payload = {}
203         for i in range(0, len(names)):
204             payload[names[i]] = values[i]
205
206         return [method, self.new_url(referer, action_url),
207                 {'headers': headers, 'data': payload}]
208
209     def handle_openid_consent_form(self, page):
210         if not isinstance(page, PageTree):
211             raise TypeError("Expected PageTree object")
212
213         try:
214             results = self.get_form_data(page, "consent_form",
215                                          ['name', 'value'])
216             action_url = results[0]
217             if action_url is None:
218                 raise Exception
219             method = results[1]
220             names = results[2]
221             values = results[3]
222         except Exception:  # pylint: disable=broad-except
223             raise WrongPage("Not an OpenID Consent Form Page")
224
225         referer = page.make_referer()
226         headers = {'referer': referer}
227
228         payload = {}
229         for i in range(0, len(names)):
230             payload[names[i]] = values[i]
231
232         # Replace known values
233         payload['decided_allow'] = 'Allow'
234
235         return [method, self.new_url(referer, action_url),
236                 {'headers': headers, 'data': payload}]
237
238     def fetch_page(self, idp, target_url, follow_redirect=True, krb=False):
239         """
240         Fetch a page and parse the response code to determine what to do
241         next.
242
243         The login process consists of redirections (302/303) and
244         potentially an unauthorized (401). For the case of unauthorized
245         try the page returned in case of fallback authentication.
246         """
247         url = target_url
248         action = 'get'
249         args = {}
250
251         while True:
252             r = self.access(action, url, krb=krb, **args)
253             if r.status_code == 303 or r.status_code == 302:
254                 if not follow_redirect:
255                     return PageTree(r)
256                 url = r.headers['location']
257                 action = 'get'
258                 args = {}
259             elif r.status_code == 401:
260                 page = PageTree(r)
261                 if r.headers.get('WWW-Authenticate', None) is None:
262                     return page
263
264                 # Fall back, hopefully to testauth authentication.
265                 try:
266                     (action, url, args) = self.handle_login_form(idp, page)
267                     continue
268                 except WrongPage:
269                     pass
270             elif r.status_code == 200:
271                 page = PageTree(r)
272
273                 try:
274                     (action, url, args) = self.handle_login_form(idp, page)
275                     continue
276                 except WrongPage:
277                     pass
278
279                 try:
280                     (action, url, args) = self.handle_return_form(page)
281                     continue
282                 except WrongPage:
283                     pass
284
285                 try:
286                     (action, url, args) = self.handle_openid_consent_form(page)
287                     continue
288                 except WrongPage:
289                     pass
290
291                 try:
292                     (action, url, args) = self.handle_openid_form(page)
293                     continue
294                 except WrongPage:
295                     pass
296
297                 # Either we got what we wanted, or we have to stop anyway
298                 return page
299             else:
300                 raise ValueError("Unhandled status (%d) on url %s" % (
301                                  r.status_code, url))
302
303     def auth_to_idp(self, idp, krb=False, rule=None, expected=None):
304
305         srv = self.servers[idp]
306         target_url = '%s/%s/' % (srv['baseuri'], idp)
307
308         r = self.access('get', target_url, krb=krb)
309         if r.status_code != 200:
310             raise ValueError("Access to idp failed: %s" % repr(r))
311
312         page = PageTree(r)
313         page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')
314         href = page.first_value('//div[@id="content"]/p/a/@href')
315         url = self.new_url(target_url, href)
316
317         page = self.fetch_page(idp, url, krb=krb)
318
319         if rule is None:
320             rule = '//div[@id="welcome"]/p/text()'
321         if expected is None:
322             expected = 'Welcome %s!' % srv['user']
323
324         page.expected_value(rule, expected)
325
326     def logout_from_idp(self, idp):
327
328         srv = self.servers[idp]
329         target_url = '%s/%s/logout' % (srv['baseuri'], idp)
330
331         r = self.access('get', target_url)
332         if r.status_code != 200:
333             raise ValueError("Logout from idp failed: %s" % repr(r))
334
335     def get_sp_metadata(self, idp, sp):
336         idpsrv = self.servers[idp]
337         idpuri = idpsrv['baseuri']
338
339         spuri = self.servers[sp]['baseuri']
340
341         return (idpuri, requests.get('%s/saml2/metadata' % spuri))
342
343     def add_sp_metadata(self, idp, sp, rest=False):
344         expected_status = 200
345         (idpuri, m) = self.get_sp_metadata(idp, sp)
346         url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
347         headers = {'referer': url}
348         if rest:
349             expected_status = 201
350             payload = {
351                 'metadata': m.content,
352                 'visible': True,
353                 'description': sp,
354                 'image': 'Zm9v',
355                 'splink': 'http://test.example.com/secret/',
356             }
357             headers['content-type'] = 'application/x-www-form-urlencoded'
358             url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp)
359             r = self.post(url, headers=headers, data=urlencode(payload))
360         else:
361             metafile = {'metafile': m.content}
362             payload = {'name': sp}
363             r = self.post(url, headers=headers, data=payload, files=metafile)
364         if r.status_code != expected_status:
365             raise ValueError('Failed to post SP data [%s]' % repr(r))
366
367         if not rest:
368             page = PageTree(r)
369             page.expected_value('//div[@class="alert alert-success"]/p/text()',
370                                 'SP Successfully added')
371
372     def set_sp_default_nameids(self, idp, sp, nameids):
373         """
374         nameids is a list of Name ID formats to enable
375         """
376         idpsrv = self.servers[idp]
377         idpuri = idpsrv['baseuri']
378         url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (idpuri, idp, sp)
379         headers = {'referer': url}
380         headers['content-type'] = 'application/x-www-form-urlencoded'
381         payload = {'submit': 'Submit',
382                    'allowed_nameids': ', '.join(nameids)}
383         r = idpsrv['session'].post(url, headers=headers,
384                                    data=payload)
385         if r.status_code != 200:
386             raise ValueError('Failed to post SP data [%s]' % repr(r))
387
388     # pylint: disable=dangerous-default-value
389     def set_attributes_and_mapping(self, idp, mapping=[], attrs=[],
390                                    spname=None):
391         """
392         Set allowed attributes and mapping in the IDP or the SP. In the
393         case of the SP both allowed attributes and the mapping need to
394         be provided. An empty option for either means delete all values.
395
396         mapping is a list of list of rules of the form:
397            [['from-1', 'to-1'], ['from-2', 'from-2']]
398
399         ex. [['*', '*'], ['fullname', 'namefull']]
400
401         attrs is the list of attributes that will be allowed:
402            ['fullname', 'givenname', 'surname']
403         """
404         idpsrv = self.servers[idp]
405         idpuri = idpsrv['baseuri']
406         if spname:  # per-SP setting
407             url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (
408                 idpuri, idp, spname)
409             mapname = 'Attribute Mapping'
410             attrname = 'Allowed Attributes'
411         else:  # global default
412             url = '%s/%s/admin/providers/saml2' % (idpuri, idp)
413             mapname = 'default attribute mapping'
414             attrname = 'default allowed attributes'
415
416         headers = {'referer': url}
417         headers['content-type'] = 'application/x-www-form-urlencoded'
418         payload = {'submit': 'Submit'}
419         count = 0
420         for m in mapping:
421             payload['%s %s-from' % (mapname, count)] = m[0]
422             payload['%s %s-to' % (mapname, count)] = m[1]
423             count += 1
424         count = 0
425         for attr in attrs:
426             payload['%s %s-name' % (attrname, count)] = attr
427             count += 1
428         r = idpsrv['session'].post(url, headers=headers,
429                                    data=payload)
430         if r.status_code != 200:
431             raise ValueError('Failed to post IDP data [%s]' % repr(r))
432
433     def fetch_rest_page(self, idpname, uri):
434         """
435         idpname - the name of the IDP to fetch the page from
436         uri - the URI of the page to retrieve
437
438         The URL for the request is built from known-information in
439         the session.
440
441         returns dict if successful
442         returns ValueError if the output is unparseable
443         """
444         baseurl = self.servers[idpname].get('baseuri')
445         page = self.fetch_page(
446             idpname,
447             '%s%s' % (baseurl, uri)
448         )
449         return json.loads(page.text)
450
451     def get_rest_sp(self, idpname, spname=None):
452         if spname is None:
453             uri = '/%s/rest/providers/saml2/SPS/' % idpname
454         else:
455             uri = '/%s/rest/providers/saml2/SPS/%s' % (idpname, spname)
456
457         return self.fetch_rest_page(idpname, uri)