3 # Copyright (C) 2014 Simo Sorce <simo@redhat.com>
5 # see file 'COPYING' for use and warranty information
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
29 def get_session(srvs, url):
31 if url.startswith(srv['baseuri']):
34 raise ValueError("Unknown URL: %s" % url)
37 def get_url(srvs, url, **kwargs):
38 session = get_session(srvs, url)
39 return session.get(url, allow_redirects=False, **kwargs)
42 def post_url(srvs, url, **kwargs):
43 session = get_session(srvs, url)
44 return session.post(url, allow_redirects=False, **kwargs)
47 def access_url(action, srvs, url, **kwargs):
49 return get_url(srvs, url, **kwargs)
50 elif action == 'post':
51 return post_url(srvs, url, **kwargs)
53 raise ValueError("Unknown action type: [%s]" % action)
56 def get_new_url(referer, action):
57 if action.startswith('/'):
58 u = urlparse.urlparse(referer)
59 return '%s://%s%s' % (u.scheme, u.netloc, action)
63 def parse_first(tree, rule):
64 result = tree.xpath(rule)
65 if type(result) is list:
73 def parse_list(tree, rule):
74 result = tree.xpath(rule)
75 if type(result) is list:
80 def handle_login_form(idp, r):
81 tree = html.fromstring(r.text)
83 action_url = parse_first(tree, '//form[@id="login_form"]/@action')
84 method = parse_first(tree, '//form[@id="login_form"]/@method')
85 except Exception: # pylint: disable=broad-except
88 if action_url is None:
91 headers = {'referer': r.url}
92 payload = {'login_name': idp['user'],
93 'login_password': idp['pass']}
96 get_new_url(r.url, action_url),
97 {'headers': headers, 'data': payload}]
100 def handle_return_form(r):
101 tree = html.fromstring(r.text)
103 action_url = parse_first(tree, '//form[@id="saml-response"]/@action')
104 method = parse_first(tree, '//form[@id="saml-response"]/@method')
105 names = parse_list(tree, '//form[@id="saml-response"]/input/@name')
106 values = parse_list(tree, '//form[@id="saml-response"]/input/@value')
107 except Exception: # pylint: disable=broad-except
110 if action_url is None:
113 headers = {'referer': r.url}
115 for i in range(0, len(names)):
116 payload[names[i]] = values[i]
119 get_new_url(r.url, action_url),
120 {'headers': headers, 'data': payload}]
123 def go_to_url(srvs, idp, start_url, target_url):
131 r = access_url(action, srvs, url, **args) # pylint: disable=star-args
132 if r.status_code == 303:
133 url = r.headers['location']
136 elif r.status_code == 200:
137 if url == target_url:
140 result = handle_login_form(idp, r)
147 result = handle_return_form(r)
154 raise ValueError("Unhandled Success code at url %s" % url)
159 raise ValueError("Unhandled status (%d) on url %s" % (r.status_code, url))
162 def auth_to_idp(idp):
164 target_url = '%s/%s/' % (idp['baseuri'], idp['name'])
167 r = access_url('get', srvs, target_url)
168 if r.status_code != 200:
169 print >> sys.stderr, " ERROR: Access to idp failed: %s" % repr(r)
172 tree = html.fromstring(r.text)
175 login = parse_first(tree, '//div[@id="content"]/p/a/text()')
176 if login != expected:
177 print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
179 href = parse_first(tree, '//div[@id="content"]/p/a/@href')
180 start_url = get_new_url(target_url, href)
181 except Exception, e: # pylint: disable=broad-except
182 print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
186 page = go_to_url(srvs, idp, start_url, target_url)
187 except Exception, e: # pylint: disable=broad-except
188 print >> sys.stderr, " ERROR: %s" % repr(e)
191 tree = html.fromstring(page)
193 welcome = parse_first(tree, '//div[@id="welcome"]/p/text()')
194 except Exception, e: # pylint: disable=broad-except
195 print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
198 expected = 'Welcome %s!' % idp['user']
199 if welcome != expected:
200 print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
207 def add_sp_metadata(idp, sp):
208 url = '%s/%s/admin/providers/saml2/admin/new' % (idp['baseuri'],
210 headers = {'referer': url}
211 payload = {'name': sp['name']}
212 m = requests.get('%s/saml2/metadata' % sp['baseuri'])
213 metafile = {'metafile': m.content}
214 r = idp['session'].post(url, headers=headers,
215 data=payload, files=metafile)
216 if r.status_code != 200:
217 print >> sys.stderr, " ERROR: %s" % repr(r)
220 tree = html.fromstring(r.text)
222 alert = parse_first(tree,
223 '//div[@class="alert alert-success"]/p/text()')
224 except Exception, e: # pylint: disable=broad-except
225 print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
228 expected = 'SP Successfully added'
229 if alert != expected:
230 print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
237 if __name__ == '__main__':
238 basedir = sys.argv[1]
240 idpsrv = {'name': 'idp1',
241 'baseuri': 'http://127.0.0.10:45080',
242 'session': requests.Session(),
243 'user': pwd.getpwuid(os.getuid())[0],
245 spsrv = {'name': 'sp1',
246 'baseuri': 'http://127.0.0.11:45081',
247 'session': requests.Session()}
249 print "test1: Authenticate to IDP ...",
250 if not auth_to_idp(idpsrv):
254 print "test1: Add SP Metadata to IDP ...",
255 if not add_sp_metadata(idpsrv, spsrv):
259 print "test1: Access SP Protected Area ...",
260 servers = [idpsrv, spsrv]
261 spurl = '%s/sp/' % (spsrv['baseuri'])
263 text = go_to_url(servers, idpsrv, spurl, spurl)
264 except ValueError, e:
265 print >> sys.stderr, " ERROR: %s" % repr(e)
268 print >> sys.stderr, "ERROR: Expected [WORKS!], got [%s]" % text