bfa32409dc04a288205d0535c1c201709482a873
[cascardo/ipsilon.git] / tests / helpers / http.py
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2014  Simo Sorce <simo@redhat.com>
4 #
5 # see file 'COPYING' for use and warranty information
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19
20
21 from lxml import html
22 import requests
23 import string
24 import urlparse
25 import json
26 from urllib import urlencode
27 from requests_kerberos import HTTPKerberosAuth, OPTIONAL
28
29
30 class WrongPage(Exception):
31     pass
32
33
34 class PageTree(object):
35
36     def __init__(self, result):
37         self.result = result
38         self.text = result.text
39         self._tree = None
40
41     @property
42     def tree(self):
43         if self._tree is None:
44             self._tree = html.fromstring(self.text)
45         return self._tree
46
47     def first_value(self, rule):
48         result = self.tree.xpath(rule)
49         if type(result) is list:
50             if len(result) > 0:
51                 result = result[0]
52             else:
53                 result = None
54         return result
55
56     def all_values(self, rule):
57         result = self.tree.xpath(rule)
58         if type(result) is list:
59             return result
60         return [result]
61
62     def make_referer(self):
63         return self.result.url
64
65     def expected_value(self, rule, expected):
66         value = self.first_value(rule)
67         if value != expected:
68             raise ValueError("Expected [%s], got [%s]" % (expected, value))
69
70
71 class HttpSessions(object):
72
73     def __init__(self):
74         self.servers = dict()
75
76     def add_server(self, name, baseuri, user=None, pwd=None):
77         new = {'baseuri': baseuri,
78                'session': requests.Session()}
79         if user:
80             new['user'] = user
81         if pwd:
82             new['pwd'] = pwd
83         self.servers[name] = new
84
85     def get_session(self, url):
86         for srv in self.servers:
87             d = self.servers[srv]
88             if url.startswith(d['baseuri']):
89                 return d['session']
90
91         raise ValueError("Unknown URL: %s" % url)
92
93     def get(self, url, krb=False, **kwargs):
94         session = self.get_session(url)
95         allow_redirects = False
96         if krb:
97             # python-requests-kerberos isn't too bright about doing mutual
98             # authentication and it tries to do it on any non-401 response
99             # which doesn't work in our case since we follow redirects.
100             kerberos_auth = HTTPKerberosAuth(mutual_authentication=OPTIONAL)
101             kwargs['auth'] = kerberos_auth
102             allow_redirects = True
103         return session.get(url, allow_redirects=allow_redirects, **kwargs)
104
105     def post(self, url, **kwargs):
106         session = self.get_session(url)
107         return session.post(url, allow_redirects=False, **kwargs)
108
109     def access(self, action, url, krb=False, **kwargs):
110         action = string.lower(action)
111         if action == 'get':
112             return self.get(url, krb, **kwargs)
113         elif action == 'post':
114             return self.post(url, **kwargs)
115         else:
116             raise ValueError("Unknown action type: [%s]" % action)
117
118     def new_url(self, referer, action):
119         if action.startswith('/'):
120             u = urlparse.urlparse(referer)
121             return '%s://%s%s' % (u.scheme, u.netloc, action)
122         return action
123
124     def get_form_data(self, page, form_id, input_fields):
125         form_selector = '//form'
126         if form_id:
127             form_selector += '[@id="%s"]' % form_id
128         values = []
129         action = page.first_value('%s/@action' % form_selector)
130         values.append(action)
131         method = page.first_value('%s/@method' % form_selector)
132         values.append(method)
133         for field in input_fields:
134             value = page.all_values('%s/input/@%s' % (form_selector,
135                                                       field))
136             values.append(value)
137         return values
138
139     def handle_login_form(self, idp, page):
140         if type(page) != PageTree:
141             raise TypeError("Expected PageTree object")
142
143         srv = self.servers[idp]
144
145         try:
146             results = self.get_form_data(page, "login_form", ["name", "value"])
147             action_url = results[0]
148             method = results[1]
149             names = results[2]
150             values = results[3]
151             if action_url is None:
152                 raise Exception
153         except Exception:  # pylint: disable=broad-except
154             raise WrongPage("Not a Login Form Page")
155
156         referer = page.make_referer()
157         headers = {'referer': referer}
158         payload = {}
159         for i in range(0, len(names)):
160             payload[names[i]] = values[i]
161
162         # replace known values
163         payload['login_name'] = srv['user']
164         payload['login_password'] = srv['pwd']
165
166         return [method, self.new_url(referer, action_url),
167                 {'headers': headers, 'data': payload}]
168
169     def handle_return_form(self, page):
170         if type(page) != PageTree:
171             raise TypeError("Expected PageTree object")
172
173         try:
174             results = self.get_form_data(page, "saml-response",
175                                          ["name", "value"])
176             action_url = results[0]
177             if action_url is None:
178                 raise Exception
179             method = results[1]
180             names = results[2]
181             values = results[3]
182         except Exception:  # pylint: disable=broad-except
183             raise WrongPage("Not a Return Form Page")
184
185         referer = page.make_referer()
186         headers = {'referer': referer}
187
188         payload = {}
189         for i in range(0, len(names)):
190             payload[names[i]] = values[i]
191
192         return [method, self.new_url(referer, action_url),
193                 {'headers': headers, 'data': payload}]
194
195     def handle_openid_form(self, page):
196         if type(page) != PageTree:
197             raise TypeError("Expected PageTree object")
198
199         if not page.first_value('//title/text()') == \
200                 'OpenID transaction in progress':
201             raise WrongPage('Not OpenID autosubmit form')
202
203         try:
204             results = self.get_form_data(page, None,
205                                          ["name", "value"])
206             action_url = results[0]
207             if action_url is None:
208                 raise Exception
209             method = results[1]
210             names = results[2]
211             values = results[3]
212         except Exception:  # pylint: disable=broad-except
213             raise WrongPage("Not OpenID autosubmit form")
214
215         referer = page.make_referer()
216         headers = {'referer': referer}
217
218         payload = {}
219         for i in range(0, len(names)):
220             payload[names[i]] = values[i]
221
222         return [method, self.new_url(referer, action_url),
223                 {'headers': headers, 'data': payload}]
224
225     def handle_openid_consent_form(self, page):
226         if type(page) != PageTree:
227             raise TypeError("Expected PageTree object")
228
229         try:
230             results = self.get_form_data(page, "consent_form",
231                                          ['name', 'value'])
232             action_url = results[0]
233             if action_url is None:
234                 raise Exception
235             method = results[1]
236             names = results[2]
237             values = results[3]
238         except Exception:  # pylint: disable=broad-except
239             raise WrongPage("Not an OpenID Consent Form Page")
240
241         referer = page.make_referer()
242         headers = {'referer': referer}
243
244         payload = {}
245         for i in range(0, len(names)):
246             payload[names[i]] = values[i]
247
248         # Replace known values
249         payload['decided_allow'] = 'Allow'
250
251         return [method, self.new_url(referer, action_url),
252                 {'headers': headers, 'data': payload}]
253
254     def fetch_page(self, idp, target_url, follow_redirect=True, krb=False):
255         """
256         Fetch a page and parse the response code to determine what to do
257         next.
258
259         The login process consists of redirections (302/303) and
260         potentially an unauthorized (401). For the case of unauthorized
261         try the page returned in case of fallback authentication.
262         """
263         url = target_url
264         action = 'get'
265         args = {}
266
267         while True:
268             r = self.access(action, url, krb=krb, **args)
269             if r.status_code == 303 or r.status_code == 302:
270                 if not follow_redirect:
271                     return PageTree(r)
272                 url = r.headers['location']
273                 action = 'get'
274                 args = {}
275             elif r.status_code == 401:
276                 page = PageTree(r)
277                 if r.headers.get('WWW-Authenticate', None) is None:
278                     return page
279
280                 # Fall back, hopefully to testauth authentication.
281                 try:
282                     (action, url, args) = self.handle_login_form(idp, page)
283                     continue
284                 except WrongPage:
285                     pass
286             elif r.status_code == 200:
287                 page = PageTree(r)
288
289                 try:
290                     (action, url, args) = self.handle_login_form(idp, page)
291                     continue
292                 except WrongPage:
293                     pass
294
295                 try:
296                     (action, url, args) = self.handle_return_form(page)
297                     continue
298                 except WrongPage:
299                     pass
300
301                 try:
302                     (action, url, args) = self.handle_openid_consent_form(page)
303                     continue
304                 except WrongPage:
305                     pass
306
307                 try:
308                     (action, url, args) = self.handle_openid_form(page)
309                     continue
310                 except WrongPage:
311                     pass
312
313                 # Either we got what we wanted, or we have to stop anyway
314                 return page
315             else:
316                 raise ValueError("Unhandled status (%d) on url %s" % (
317                                  r.status_code, url))
318
319     def auth_to_idp(self, idp, krb=False):
320
321         srv = self.servers[idp]
322         target_url = '%s/%s/' % (srv['baseuri'], idp)
323
324         r = self.access('get', target_url, krb=krb)
325         if r.status_code != 200:
326             raise ValueError("Access to idp failed: %s" % repr(r))
327
328         page = PageTree(r)
329         page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')
330         href = page.first_value('//div[@id="content"]/p/a/@href')
331         url = self.new_url(target_url, href)
332
333         page = self.fetch_page(idp, url, krb=krb)
334
335         page.expected_value('//div[@id="welcome"]/p/text()',
336                             'Welcome %s!' % srv['user'])
337
338     def logout_from_idp(self, idp):
339
340         srv = self.servers[idp]
341         target_url = '%s/%s/logout' % (srv['baseuri'], idp)
342
343         r = self.access('get', target_url)
344         if r.status_code != 200:
345             raise ValueError("Logout from idp failed: %s" % repr(r))
346
347     def get_sp_metadata(self, idp, sp):
348         idpsrv = self.servers[idp]
349         idpuri = idpsrv['baseuri']
350
351         spuri = self.servers[sp]['baseuri']
352
353         return (idpuri, requests.get('%s/saml2/metadata' % spuri))
354
355     def add_sp_metadata(self, idp, sp, rest=False):
356         expected_status = 200
357         (idpuri, m) = self.get_sp_metadata(idp, sp)
358         url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
359         headers = {'referer': url}
360         if rest:
361             expected_status = 201
362             payload = {'metadata': m.content}
363             headers['content-type'] = 'application/x-www-form-urlencoded'
364             url = '%s/%s/rest/providers/saml2/SPS/%s' % (idpuri, idp, sp)
365             r = self.post(url, headers=headers, data=urlencode(payload))
366         else:
367             metafile = {'metafile': m.content}
368             payload = {'name': sp}
369             r = self.post(url, headers=headers, data=payload, files=metafile)
370         if r.status_code != expected_status:
371             raise ValueError('Failed to post SP data [%s]' % repr(r))
372
373         if not rest:
374             page = PageTree(r)
375             page.expected_value('//div[@class="alert alert-success"]/p/text()',
376                                 'SP Successfully added')
377
378     def set_sp_default_nameids(self, idp, sp, nameids):
379         """
380         nameids is a list of Name ID formats to enable
381         """
382         idpsrv = self.servers[idp]
383         idpuri = idpsrv['baseuri']
384         url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (idpuri, idp, sp)
385         headers = {'referer': url}
386         headers['content-type'] = 'application/x-www-form-urlencoded'
387         payload = {'submit': 'Submit',
388                    'allowed_nameids': ', '.join(nameids)}
389         r = idpsrv['session'].post(url, headers=headers,
390                                    data=payload)
391         if r.status_code != 200:
392             raise ValueError('Failed to post SP data [%s]' % repr(r))
393
394     # pylint: disable=dangerous-default-value
395     def set_attributes_and_mapping(self, idp, mapping=[], attrs=[],
396                                    spname=None):
397         """
398         Set allowed attributes and mapping in the IDP or the SP. In the
399         case of the SP both allowed attributes and the mapping need to
400         be provided. An empty option for either means delete all values.
401
402         mapping is a list of list of rules of the form:
403            [['from-1', 'to-1'], ['from-2', 'from-2']]
404
405         ex. [['*', '*'], ['fullname', 'namefull']]
406
407         attrs is the list of attributes that will be allowed:
408            ['fullname', 'givenname', 'surname']
409         """
410         idpsrv = self.servers[idp]
411         idpuri = idpsrv['baseuri']
412         if spname:  # per-SP setting
413             url = '%s/%s/admin/providers/saml2/admin/sp/%s' % (
414                 idpuri, idp, spname)
415             mapname = 'Attribute Mapping'
416             attrname = 'Allowed Attributes'
417         else:  # global default
418             url = '%s/%s/admin/providers/saml2' % (idpuri, idp)
419             mapname = 'default attribute mapping'
420             attrname = 'default allowed attributes'
421
422         headers = {'referer': url}
423         headers['content-type'] = 'application/x-www-form-urlencoded'
424         payload = {'submit': 'Submit'}
425         count = 0
426         for m in mapping:
427             payload['%s %s-from' % (mapname, count)] = m[0]
428             payload['%s %s-to' % (mapname, count)] = m[1]
429             count += 1
430         count = 0
431         for attr in attrs:
432             payload['%s %s-name' % (attrname, count)] = attr
433             count += 1
434         r = idpsrv['session'].post(url, headers=headers,
435                                    data=payload)
436         if r.status_code != 200:
437             raise ValueError('Failed to post IDP data [%s]' % repr(r))
438
439     def fetch_rest_page(self, idpname, uri):
440         """
441         idpname - the name of the IDP to fetch the page from
442         uri - the URI of the page to retrieve
443
444         The URL for the request is built from known-information in
445         the session.
446
447         returns dict if successful
448         returns ValueError if the output is unparseable
449         """
450         baseurl = self.servers[idpname].get('baseuri')
451         page = self.fetch_page(
452             idpname,
453             '%s%s' % (baseurl, uri)
454         )
455         return json.loads(page.text)
456
457     def get_rest_sp(self, idpname, spname=None):
458         if spname is None:
459             uri = '/%s/rest/providers/saml2/SPS/' % idpname
460         else:
461             uri = '/%s/rest/providers/saml2/SPS/%s' % (idpname, spname)
462
463         return self.fetch_rest_page(idpname, uri)