Move parsing code into helpers module
[cascardo/ipsilon.git] / tests / helpers / http.py
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2014  Simo Sorce <simo@redhat.com>
4 #
5 # see file 'COPYING' for use and warranty information
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19
20
21 from lxml import html
22 import requests
23 import string
24 import urlparse
25
26
27 class WrongPage(Exception):
28     pass
29
30
31 class PageTree(object):
32
33     def __init__(self, result):
34         self.result = result
35         self.text = result.text
36         self._tree = None
37
38     @property
39     def tree(self):
40         if not self._tree:
41             self._tree = html.fromstring(self.text)
42         return self._tree
43
44     def first_value(self, rule):
45         result = self.tree.xpath(rule)
46         if type(result) is list:
47             if len(result) > 0:
48                 result = result[0]
49             else:
50                 result = None
51         return result
52
53     def all_values(self, rule):
54         result = self.tree.xpath(rule)
55         if type(result) is list:
56             return result
57         return [result]
58
59     def make_referer(self):
60         return self.result.url
61
62     def expected_value(self, rule, expected):
63         value = self.first_value(rule)
64         if value != expected:
65             raise ValueError("Expected [%s], got [%s]" % (expected, value))
66
67
68 class HttpSessions(object):
69
70     def __init__(self):
71         self.servers = dict()
72
73     def add_server(self, name, baseuri, user=None, pwd=None):
74         new = {'baseuri': baseuri,
75                'session': requests.Session()}
76         if user:
77             new['user'] = user
78         if pwd:
79             new['pwd'] = pwd
80         self.servers[name] = new
81
82     def get_session(self, url):
83         for srv in self.servers:
84             d = self.servers[srv]
85             if url.startswith(d['baseuri']):
86                 return d['session']
87
88         raise ValueError("Unknown URL: %s" % url)
89
90     def get(self, url, **kwargs):
91         session = self.get_session(url)
92         return session.get(url, allow_redirects=False, **kwargs)
93
94     def post(self, url, **kwargs):
95         session = self.get_session(url)
96         return session.post(url, allow_redirects=False, **kwargs)
97
98     def access(self, action, url, **kwargs):
99         action = string.lower(action)
100         if action == 'get':
101             return self.get(url, **kwargs)
102         elif action == 'post':
103             return self.post(url, **kwargs)
104         else:
105             raise ValueError("Unknown action type: [%s]" % action)
106
107     def new_url(self, referer, action):
108         if action.startswith('/'):
109             u = urlparse.urlparse(referer)
110             return '%s://%s%s' % (u.scheme, u.netloc, action)
111         return action
112
113     def get_form_data(self, page, form_id, input_fields):
114         values = []
115         action = page.first_value('//form[@id="%s"]/@action' % form_id)
116         values.append(action)
117         method = page.first_value('//form[@id="%s"]/@method' % form_id)
118         values.append(method)
119         for field in input_fields:
120             value = page.all_values('//form[@id="%s"]/input/@%s' % (form_id,
121                                                                     field))
122             values.append(value)
123         return values
124
125     def handle_login_form(self, idp, page):
126         if type(page) != PageTree:
127             raise TypeError("Expected PageTree object")
128
129         srv = self.servers[idp]
130
131         try:
132             results = self.get_form_data(page, "login_form", [])
133             action_url = results[0]
134             method = results[1]
135             if action_url is None:
136                 raise Exception
137         except Exception:  # pylint: disable=broad-except
138             raise WrongPage("Not a Login Form Page")
139
140         referer = page.make_referer()
141         headers = {'referer': referer}
142         payload = {'login_name': srv['user'],
143                    'login_password': srv['pwd']}
144
145         return [method, self.new_url(referer, action_url),
146                 {'headers': headers, 'data': payload}]
147
148     def handle_return_form(self, page):
149         if type(page) != PageTree:
150             raise TypeError("Expected PageTree object")
151
152         try:
153             results = self.get_form_data(page, "saml-response",
154                                          ["name", "value"])
155             action_url = results[0]
156             if action_url is None:
157                 raise Exception
158             method = results[1]
159             names = results[2]
160             values = results[3]
161         except Exception:  # pylint: disable=broad-except
162             raise WrongPage("Not a Return Form Page")
163
164         referer = page.make_referer()
165         headers = {'referer': referer}
166
167         payload = {}
168         for i in range(0, len(names)):
169             payload[names[i]] = values[i]
170
171         return [method, self.new_url(referer, action_url),
172                 {'headers': headers, 'data': payload}]
173
174     def fetch_page(self, idp, target_url):
175         url = target_url
176         action = 'get'
177         args = {}
178
179         while True:
180             r = self.access(action, url, **args)  # pylint: disable=star-args
181             if r.status_code == 303:
182                 url = r.headers['location']
183                 action = 'get'
184                 args = {}
185             elif r.status_code == 200:
186                 page = PageTree(r)
187
188                 try:
189                     (action, url, args) = self.handle_login_form(idp, page)
190                     continue
191                 except WrongPage:
192                     pass
193
194                 try:
195                     (action, url, args) = self.handle_return_form(page)
196                     continue
197                 except WrongPage:
198                     pass
199
200                 # Either we got what we wanted, or we have to stop anyway
201                 return page
202             else:
203                 raise ValueError("Unhandled status (%d) on url %s" % (
204                                  r.status_code, url))
205
206     def auth_to_idp(self, idp):
207
208         srv = self.servers[idp]
209         target_url = '%s/%s/' % (srv['baseuri'], idp)
210
211         r = self.access('get', target_url)
212         if r.status_code != 200:
213             raise ValueError("Access to idp failed: %s" % repr(r))
214
215         page = PageTree(r)
216         page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')
217         href = page.first_value('//div[@id="content"]/p/a/@href')
218         url = self.new_url(target_url, href)
219
220         page = self.fetch_page(idp, url)
221         page.expected_value('//div[@id="welcome"]/p/text()',
222                             'Welcome %s!' % srv['user'])
223
224     def add_sp_metadata(self, idp, sp):
225         idpsrv = self.servers[idp]
226         idpuri = idpsrv['baseuri']
227         spuri = self.servers[sp]['baseuri']
228
229         url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
230         headers = {'referer': url}
231         payload = {'name': sp}
232         m = requests.get('%s/saml2/metadata' % spuri)
233         metafile = {'metafile': m.content}
234         r = idpsrv['session'].post(url, headers=headers,
235                                    data=payload, files=metafile)
236         if r.status_code != 200:
237             raise ValueError('Failed to post SP data [%s]' % repr(r))
238
239         page = PageTree(r)
240         page.expected_value('//div[@class="alert alert-success"]/p/text()',
241                             'SP Successfully added')