Add first test, checks client/server installs work
[cascardo/ipsilon.git] / tests / test1.py
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2014  Simo Sorce <simo@redhat.com>
4 #
5 # see file 'COPYING' for use and warranty information
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19
20
21 from lxml import html
22 import os
23 import pwd
24 import requests
25 import sys
26 import urlparse
27
28
29 def get_session(srvs, url):
30     for srv in srvs:
31         if url.startswith(srv['baseuri']):
32             return srv['session']
33
34     raise ValueError("Unknown URL: %s" % url)
35
36
37 def get_url(srvs, url, **kwargs):
38     session = get_session(srvs, url)
39     return session.get(url, allow_redirects=False, **kwargs)
40
41
42 def post_url(srvs, url, **kwargs):
43     session = get_session(srvs, url)
44     return session.post(url, allow_redirects=False, **kwargs)
45
46
47 def access_url(action, srvs, url, **kwargs):
48     if action == 'get':
49         return get_url(srvs, url, **kwargs)
50     elif action == 'post':
51         return post_url(srvs, url, **kwargs)
52     else:
53         raise ValueError("Unknown action type: [%s]" % action)
54
55
56 def get_new_url(referer, action):
57     if action.startswith('/'):
58         u = urlparse.urlparse(referer)
59         return '%s://%s%s' % (u.scheme, u.netloc, action)
60     return action
61
62
63 def parse_first(tree, rule):
64     result = tree.xpath(rule)
65     if type(result) is list:
66         if len(result) > 0:
67             result = result[0]
68         else:
69             result = None
70     return result
71
72
73 def parse_list(tree, rule):
74     result = tree.xpath(rule)
75     if type(result) is list:
76         return result
77     return [result]
78
79
80 def handle_login_form(idp, r):
81     tree = html.fromstring(r.text)
82     try:
83         action_url = parse_first(tree, '//form[@id="login_form"]/@action')
84         method = parse_first(tree, '//form[@id="login_form"]/@method')
85     except Exception:  # pylint: disable=broad-except
86         return []
87
88     if action_url is None:
89         return []
90
91     headers = {'referer': r.url}
92     payload = {'login_name': idp['user'],
93                'login_password': idp['pass']}
94
95     return [method,
96             get_new_url(r.url, action_url),
97             {'headers': headers, 'data': payload}]
98
99
100 def handle_return_form(r):
101     tree = html.fromstring(r.text)
102     try:
103         action_url = parse_first(tree, '//form[@id="saml-response"]/@action')
104         method = parse_first(tree, '//form[@id="saml-response"]/@method')
105         names = parse_list(tree, '//form[@id="saml-response"]/input/@name')
106         values = parse_list(tree, '//form[@id="saml-response"]/input/@value')
107     except Exception:  # pylint: disable=broad-except
108         return []
109
110     if action_url is None:
111         return []
112
113     headers = {'referer': r.url}
114     payload = {}
115     for i in range(0, len(names)):
116         payload[names[i]] = values[i]
117
118     return [method,
119             get_new_url(r.url, action_url),
120             {'headers': headers, 'data': payload}]
121
122
123 def go_to_url(srvs, idp, start_url, target_url):
124
125     url = start_url
126     action = 'get'
127     args = {}
128
129     good = True
130     while good:
131         r = access_url(action, srvs, url, **args)  # pylint: disable=star-args
132         if r.status_code == 303:
133             url = r.headers['location']
134             action = 'get'
135             args = {}
136         elif r.status_code == 200:
137             if url == target_url:
138                 return r.text
139
140             result = handle_login_form(idp, r)
141             if result:
142                 action = result[0]
143                 url = result[1]
144                 args = result[2]
145                 continue
146
147             result = handle_return_form(r)
148             if result:
149                 action = result[0]
150                 url = result[1]
151                 args = result[2]
152                 continue
153
154             raise ValueError("Unhandled Success code at url %s" % url)
155
156         else:
157             good = False
158
159     raise ValueError("Unhandled status (%d) on url %s" % (r.status_code, url))
160
161
162 def auth_to_idp(idp):
163
164     target_url = '%s/%s/' % (idp['baseuri'], idp['name'])
165     srvs = [idp]
166
167     r = access_url('get', srvs, target_url)
168     if r.status_code != 200:
169         print >> sys.stderr, " ERROR: Access to idp failed: %s" % repr(r)
170         return False
171
172     tree = html.fromstring(r.text)
173     try:
174         expected = 'Log In'
175         login = parse_first(tree, '//div[@id="content"]/p/a/text()')
176         if login != expected:
177             print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
178                                                                      login)
179         href = parse_first(tree, '//div[@id="content"]/p/a/@href')
180         start_url = get_new_url(target_url, href)
181     except Exception, e:  # pylint: disable=broad-except
182         print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
183         return False
184
185     try:
186         page = go_to_url(srvs, idp, start_url, target_url)
187     except Exception, e:  # pylint: disable=broad-except
188         print >> sys.stderr, " ERROR: %s" % repr(e)
189         return False
190
191     tree = html.fromstring(page)
192     try:
193         welcome = parse_first(tree, '//div[@id="welcome"]/p/text()')
194     except Exception, e:  # pylint: disable=broad-except
195         print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
196         return False
197
198     expected = 'Welcome %s!' % idp['user']
199     if welcome != expected:
200         print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
201                                                                  welcome)
202         return False
203
204     return True
205
206
207 def add_sp_metadata(idp, sp):
208     url = '%s/%s/admin/providers/saml2/admin/new' % (idp['baseuri'],
209                                                      idp['name'])
210     headers = {'referer': url}
211     payload = {'name': sp['name']}
212     m = requests.get('%s/saml2/metadata' % sp['baseuri'])
213     metafile = {'metafile': m.content}
214     r = idp['session'].post(url, headers=headers,
215                             data=payload, files=metafile)
216     if r.status_code != 200:
217         print >> sys.stderr, " ERROR: %s" % repr(r)
218         return False
219
220     tree = html.fromstring(r.text)
221     try:
222         alert = parse_first(tree,
223                             '//div[@class="alert alert-success"]/p/text()')
224     except Exception, e:  # pylint: disable=broad-except
225         print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
226         return False
227
228     expected = 'SP Successfully added'
229     if alert != expected:
230         print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
231                                                                  alert)
232         return False
233
234     return True
235
236
237 if __name__ == '__main__':
238     basedir = sys.argv[1]
239
240     idpsrv = {'name': 'idp1',
241               'baseuri': 'http://127.0.0.10:45080',
242               'session': requests.Session(),
243               'user': pwd.getpwuid(os.getuid())[0],
244               'pass': 'ipsilon'}
245     spsrv = {'name': 'sp1',
246              'baseuri': 'http://127.0.0.11:45081',
247              'session': requests.Session()}
248
249     print "test1: Authenticate to IDP ...",
250     if not auth_to_idp(idpsrv):
251         sys.exit(1)
252     print " SUCCESS"
253
254     print "test1: Add SP Metadata to IDP ...",
255     if not add_sp_metadata(idpsrv, spsrv):
256         sys.exit(1)
257     print " SUCCESS"
258
259     print "test1: Access SP Protected Area ...",
260     servers = [idpsrv, spsrv]
261     spurl = '%s/sp/' % (spsrv['baseuri'])
262     try:
263         text = go_to_url(servers, idpsrv, spurl, spurl)
264     except ValueError, e:
265         print >> sys.stderr, " ERROR: %s" % repr(e)
266         sys.exit(1)
267     if text != "WORKS!":
268         print >> sys.stderr, "ERROR: Expected [WORKS!], got [%s]" % text
269         sys.exit(1)
270     print " SUCCESS"