0da7ee25b63c117438903dcf3b8ef6f2979349ff
[cascardo/ipsilon.git] / tests / helpers / http.py
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2014  Simo Sorce <simo@redhat.com>
4 #
5 # see file 'COPYING' for use and warranty information
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19
20
21 from lxml import html
22 import requests
23 import string
24 import urlparse
25 import json
26 from urllib import urlencode
27 from requests_kerberos import HTTPKerberosAuth, OPTIONAL
28
29
30 class WrongPage(Exception):
31     pass
32
33
34 class PageTree(object):
35
36     def __init__(self, result):
37         self.result = result
38         self.text = result.text
39         self._tree = None
40
41     @property
42     def tree(self):
43         if self._tree is None:
44             self._tree = html.fromstring(self.text)
45         return self._tree
46
47     def first_value(self, rule):
48         result = self.tree.xpath(rule)
49         if type(result) is list:
50             if len(result) > 0:
51                 result = result[0]
52             else:
53                 result = None
54         return result
55
56     def all_values(self, rule):
57         result = self.tree.xpath(rule)
58         if type(result) is list:
59             return result
60         return [result]
61
62     def make_referer(self):
63         return self.result.url
64
65     def expected_value(self, rule, expected):
66         value = self.first_value(rule)
67         if value != expected:
68             raise ValueError("Expected [%s], got [%s]" % (expected, value))
69
70
71 class HttpSessions(object):
72
73     def __init__(self):
74         self.servers = dict()
75
76     def add_server(self, name, baseuri, user=None, pwd=None):
77         new = {'baseuri': baseuri,
78                'session': requests.Session()}
79         if user:
80             new['user'] = user
81         if pwd:
82             new['pwd'] = pwd
83         self.servers[name] = new
84
85     def get_session(self, url):
86         for srv in self.servers:
87             d = self.servers[srv]
88             if url.startswith(d['baseuri']):
89                 return d['session']
90
91         raise ValueError("Unknown URL: %s" % url)
92
93     def get(self, url, krb=False, **kwargs):
94         session = self.get_session(url)
95         allow_redirects = False
96         if krb:
97             # In at least the test instance we don't get back a negotiate
98             # blob to do mutual authentication against.
99             kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
100             kwargs['auth'] = kerberos_auth
101             allow_redirects = True
102         return session.get(url, allow_redirects=allow_redirects, **kwargs)
103
104     def post(self, url, **kwargs):
105         session = self.get_session(url)
106         return session.post(url, allow_redirects=False, **kwargs)
107
108     def access(self, action, url, krb=False, **kwargs):
109         action = string.lower(action)
110         if action == 'get':
111             return self.get(url, krb, **kwargs)
112         elif action == 'post':
113             return self.post(url, **kwargs)
114         else:
115             raise ValueError("Unknown action type: [%s]" % action)
116
117     def new_url(self, referer, action):
118         if action.startswith('/'):
119             u = urlparse.urlparse(referer)
120             return '%s://%s%s' % (u.scheme, u.netloc, action)
121         return action
122
123     def get_form_data(self, page, form_id, input_fields):
124         form_selector = '//form'
125         if form_id:
126             form_selector += '[@id="%s"]' % form_id
127         values = []
128         action = page.first_value('%s/@action' % form_selector)
129         values.append(action)
130         method = page.first_value('%s/@method' % form_selector)
131         values.append(method)
132         for field in input_fields:
133             value = page.all_values('%s/input/@%s' % (form_selector,
134                                                       field))
135             values.append(value)
136         return values
137
138     def handle_login_form(self, idp, page):
139         if type(page) != PageTree:
140             raise TypeError("Expected PageTree object")
141
142         srv = self.servers[idp]
143
144         try:
145             results = self.get_form_data(page, "login_form", ["name", "value"])
146             action_url = results[0]
147             method = results[1]
148             names = results[2]
149             values = results[3]
150             if action_url is None:
151                 raise Exception
152         except Exception:  # pylint: disable=broad-except
153             raise WrongPage("Not a Login Form Page")
154
155         referer = page.make_referer()
156         headers = {'referer': referer}
157         payload = {}
158         for i in range(0, len(names)):
159             payload[names[i]] = values[i]
160
161         # replace known values
162         payload['login_name'] = srv['user']
163         payload['login_password'] = srv['pwd']
164
165         return [method, self.new_url(referer, action_url),
166                 {'headers': headers, 'data': payload}]
167
168     def handle_return_form(self, page):
169         if type(page) != PageTree:
170             raise TypeError("Expected PageTree object")
171
172         try:
173             results = self.get_form_data(page, "saml-response",
174                                          ["name", "value"])
175             action_url = results[0]
176             if action_url is None:
177                 raise Exception
178             method = results[1]
179             names = results[2]
180             values = results[3]
181         except Exception:  # pylint: disable=broad-except
182             raise WrongPage("Not a Return Form Page")
183
184         referer = page.make_referer()
185         headers = {'referer': referer}
186
187         payload = {}
188         for i in range(0, len(names)):
189             payload[names[i]] = values[i]
190
191         return [method, self.new_url(referer, action_url),
192                 {'headers': headers, 'data': payload}]
193
194     def handle_openid_form(self, page):
195         if type(page) != PageTree:
196             raise TypeError("Expected PageTree object")
197
198         if not page.first_value('//title/text()') == \
199                 'OpenID transaction in progress':
200             raise WrongPage('Not OpenID autosubmit form')
201
202         try:
203             results = self.get_form_data(page, None,
204                                          ["name", "value"])
205             action_url = results[0]
206             if action_url is None:
207                 raise Exception
208             method = results[1]
209             names = results[2]
210             values = results[3]
211         except Exception:  # pylint: disable=broad-except
212             raise WrongPage("Not OpenID autosubmit form")
213
214         referer = page.make_referer()
215         headers = {'referer': referer}
216
217         payload = {}
218         for i in range(0, len(names)):
219             payload[names[i]] = values[i]
220
221         return [method, self.new_url(referer, action_url),
222                 {'headers': headers, 'data': payload}]
223
224     def handle_openid_consent_form(self, page):
225         if type(page) != PageTree:
226             raise TypeError("Expected PageTree object")
227
228         try:
229             results = self.get_form_data(page, "consent_form",
230                                          ['name', 'value'])
231             action_url = results[0]
232             if action_url is None:
233                 raise Exception
234             method = results[1]
235             names = results[2]
236             values = results[3]
237         except Exception:  # pylint: disable=broad-except
238             raise WrongPage("Not an OpenID Consent Form Page")
239
240         referer = page.make_referer()
241         headers = {'referer': referer}
242
243         payload = {}
244         for i in range(0, len(names)):
245             payload[names[i]] = values[i]
246
247         # Replace known values
248         payload['decided_allow'] = 'Allow'
249
250         return [method, self.new_url(referer, action_url),
251                 {'headers': headers, 'data': payload}]
252
253     def fetch_page(self, idp, target_url, follow_redirect=True, krb=False):
254         """
255         Fetch a page and parse the response code to determine what to do
256         next.
257
258         The login process consists of redirections (302/303) and
259         potentially an unauthorized (401). For the case of unauthorized
260         try the page returned in case of fallback authentication.
261         """
262         url = target_url
263         action = 'get'
264         args = {}
265
266         while True:
267             # pylint: disable=star-args
268             r = self.access(action, url, krb=krb, **args)
269             if r.status_code == 303 or r.status_code == 302:
270                 if not follow_redirect:
271                     return PageTree(r)
272                 url = r.headers['location']
273                 action = 'get'
274                 args = {}
275             elif r.status_code == 401:
276                 page = PageTree(r)
277                 if r.headers.get('WWW-Authenticate', None) is None:
278                     return page
279
280                 # Fall back, hopefully to testauth authentication.
281                 try:
282                     (action, url, args) = self.handle_login_form(idp, page)
283                     continue
284                 except WrongPage:
285                     pass
286             elif r.status_code == 200:
287                 page = PageTree(r)
288
289                 try:
290                     (action, url, args) = self.handle_login_form(idp, page)
291                     continue
292                 except WrongPage:
293                     pass
294
295                 try:
296                     (action, url, args) = self.handle_return_form(page)
297                     continue
298                 except WrongPage:
299                     pass
300
301                 try:
302                     (action, url, args) = self.handle_openid_consent_form(page)
303                     continue
304                 except WrongPage:
305                     pass
306
307                 try:
308                     (action, url, args) = self.handle_openid_form(page)
309                     continue
310                 except WrongPage:
311                     pass
312
313                 # Either we got what we wanted, or we have to stop anyway
314                 return page
315             else:
316                 raise ValueError("Unhandled status (%d) on url %s" % (
317                                  r.status_code, url))
318
319     def auth_to_idp(self, idp, krb=False):
320
321         srv = self.servers[idp]
322         target_url = '%s/%s/' % (srv['baseuri'], idp)
323
324         r = self.access('get', target_url, krb=krb)
325         if r.status_code != 200:
326             raise ValueError("Access to idp failed: %s" % repr(r))
327
328         page = PageTree(r)
329         page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')
330         href = page.first_value('//div[@id="content"]/p/a/@href')
331         url = self.new_url(target_url, href)
332
333         page = self.fetch_page(idp, url, krb=krb)
334
335         page.expected_value('//div[@id="welcome"]/p/text()',
336                             'Welcome %s!' % srv['user'])
337
338     def logout_from_idp(self, idp):
339
340         srv = self.servers[idp]
341         target_url = '%s/%s/logout' % (srv['baseuri'], idp)
342
343         r = self.access('get', target_url)
344         if r.status_code != 200:
345             raise ValueError("Logout from idp failed: %s" % repr(r))
346
347     def get_sp_metadata(self, idp, sp):
348         idpsrv = self.servers[idp]
349         idpuri = idpsrv['baseuri']
350
351         spuri = self.servers[sp]['baseuri']
352
353         return (idpuri, requests.get('%s/saml2/metadata' % spuri))
354
355     def add_sp_metadata(self, idp, sp, rest=False):
356         expected_status = 200
357         (idpuri, m) = self.get_sp_metadata(idp, sp)
358         url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
359         headers = {'referer': url}
360         if rest:
361             expected_status = 201
362             payload = {'metadata': m.content}
363             headers['content-type'] = 'application/x-www-form-urlencoded'
364             url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp)
365             r = self.post(url, headers=headers, data=urlencode(payload))
366         else:
367             metafile = {'metafile': m.content}
368             payload = {'name': sp}
369             r = self.post(url, headers=headers, data=payload, files=metafile)
370         if r.status_code != expected_status:
371             raise ValueError('Failed to post SP data [%s]' % repr(r))
372
373         if not rest:
374             page = PageTree(r)
375             page.expected_value('//div[@class="alert alert-success"]/p/text()',
376                                 'SP Successfully added')
377
378     def set_sp_default_nameids(self, idp, sp, nameids):
379         """
380         nameids is a list of Name ID formats to enable
381         """
382         idpsrv = self.servers[idp]
383         idpuri = idpsrv['baseuri']
384         url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (idpuri, idp, sp)
385         headers = {'referer': url}
386         headers['content-type'] = 'application/x-www-form-urlencoded'
387         payload = {'submit': 'Submit',
388                    'allowed_nameids': ', '.join(nameids)}
389         r = idpsrv['session'].post(url, headers=headers,
390                                    data=payload)
391         if r.status_code != 200:
392             raise ValueError('Failed to post SP data [%s]' % repr(r))
393
394     # pylint: disable=dangerous-default-value
395     def set_attributes_and_mapping(self, idp, mapping=[], attrs=[],
396                                    spname=None):
397         """
398         Set allowed attributes and mapping in the IDP or the SP. In the
399         case of the SP both allowed attributes and the mapping need to
400         be provided. An empty option for either means delete all values.
401
402         mapping is a list of list of rules of the form:
403            [['from-1', 'to-1'], ['from-2', 'from-2']]
404
405         ex. [['*', '*'], ['fullname', 'namefull']]
406
407         attrs is the list of attributes that will be allowed:
408            ['fullname', 'givenname', 'surname']
409         """
410         idpsrv = self.servers[idp]
411         idpuri = idpsrv['baseuri']
412         if spname:  # per-SP setting
413             url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (
414                 idpuri, idp, spname)
415             mapname = 'Attribute Mapping'
416             attrname = 'Allowed Attributes'
417         else:  # global default
418             url = '%s/%s/admin/providers/saml2' % (idpuri, idp)
419             mapname = 'default attribute mapping'
420             attrname = 'default allowed attributes'
421
422         headers = {'referer': url}
423         headers['content-type'] = 'application/x-www-form-urlencoded'
424         payload = {'submit': 'Submit'}
425         count = 0
426         for m in mapping:
427             payload['%s %s-from' % (mapname, count)] = m[0]
428             payload['%s %s-to' % (mapname, count)] = m[1]
429             count += 1
430         count = 0
431         for attr in attrs:
432             payload['%s %s-name' % (attrname, count)] = attr
433             count += 1
434         r = idpsrv['session'].post(url, headers=headers,
435                                    data=payload)
436         if r.status_code != 200:
437             raise ValueError('Failed to post IDP data [%s]' % repr(r))
438
439     def fetch_rest_page(self, idpname, uri):
440         """
441         idpname - the name of the IDP to fetch the page from
442         uri - the URI of the page to retrieve
443
444         The URL for the request is built from known-information in
445         the session.
446
447         returns dict if successful
448         returns ValueError if the output is unparseable
449         """
450         baseurl = self.servers[idpname].get('baseuri')
451         page = self.fetch_page(
452             idpname,
453             '%s%s' % (baseurl, uri)
454         )
455         return json.loads(page.text)
456
457     def get_rest_sp(self, idpname, spname=None):
458         if spname is None:
459             uri = '/%s/rest/providers/saml2/SPS/' % idpname
460         else:
461             uri = '/%s/rest/providers/saml2/SPS/%s' % (idpname, spname)
462
463         return self.fetch_rest_page(idpname, uri)