From 239a4ce2b22e25e1759c39a890bd0a506b6f3795 Mon Sep 17 00:00:00 2001 From: Simo Sorce Date: Sun, 1 Jun 2014 15:47:44 -0400 Subject: [PATCH] Add first test, checks client/server installs work Signed-off-by: Simo Sorce --- Makefile | 1 + tests/test1.cfg | 36 +++++++ tests/test1.py | 270 ++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 307 insertions(+) create mode 100644 tests/test1.cfg create mode 100755 tests/test1.py diff --git a/Makefile b/Makefile index 439caa9..0d7e01a 100644 --- a/Makefile +++ b/Makefile @@ -41,6 +41,7 @@ lp-test: test: lp-test PYTHONPATH=./ ./ipsilon/tools/saml2metadata.py + ./tests/tests.py --test=test1 sdist: python setup.py sdist diff --git a/tests/test1.cfg b/tests/test1.cfg new file mode 100644 index 0000000..81ae254 --- /dev/null +++ b/tests/test1.cfg @@ -0,0 +1,36 @@ +[tests] +servers=idp1:127.0.0.10:45080 +clients=sp1:127.0.0.11:45081 + +[idp1_globals] +TEMPLATES=${TESTDIR}/templates/install +CONFDIR=${TESTDIR}/etc +DATADIR=${TESTDIR}/lib +HTTPDCONFD=${TESTDIR}/idp1/conf.d +STATICDIR=${ROOTDIR} +BINDIR=${ROOTDIR}/ipsilon +WSGI_SOCKET_PREFIX=${TESTDIR}/idp1/logs/wsgi + +[idp1_arguments] +hostname=127.0.0.10:45080 +admin_user=${TEST_USER} +system_user=${TEST_USER} +instance=idp1 +saml2_secure=no +testauth=yes +pam=no +krb=no +ipa=no + +[sp1_globals] +HTTPDCONFD=${TESTDIR}/sp1/conf.d +SAML2_TEMPLATE=${TESTDIR}/templates/install/saml2/sp.conf +SAML2_CONFFILE=${TESTDIR}/sp1/conf.d/ipsilon-saml.conf +SAML2_HTTPDIR=${TESTDIR}/sp1/saml2 + +[sp1_arguments] +hostname=127.0.0.11:45081 +saml_idp_metadata=http://127.0.0.10:45080/idp1/saml2/metadata +saml_secure_setup=False +saml_auth=/sp +httpd_user=${TEST_USER} diff --git a/tests/test1.py b/tests/test1.py new file mode 100755 index 0000000..6e76f88 --- /dev/null +++ b/tests/test1.py @@ -0,0 +1,270 @@ +#!/usr/bin/python +# +# Copyright (C) 2014 Simo Sorce +# +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +from lxml import html +import os +import pwd +import requests +import sys +import urlparse + + +def get_session(srvs, url): + for srv in srvs: + if url.startswith(srv['baseuri']): + return srv['session'] + + raise ValueError("Unknown URL: %s" % url) + + +def get_url(srvs, url, **kwargs): + session = get_session(srvs, url) + return session.get(url, allow_redirects=False, **kwargs) + + +def post_url(srvs, url, **kwargs): + session = get_session(srvs, url) + return session.post(url, allow_redirects=False, **kwargs) + + +def access_url(action, srvs, url, **kwargs): + if action == 'get': + return get_url(srvs, url, **kwargs) + elif action == 'post': + return post_url(srvs, url, **kwargs) + else: + raise ValueError("Unknown action type: [%s]" % action) + + +def get_new_url(referer, action): + if action.startswith('/'): + u = urlparse.urlparse(referer) + return '%s://%s%s' % (u.scheme, u.netloc, action) + return action + + +def parse_first(tree, rule): + result = tree.xpath(rule) + if type(result) is list: + if len(result) > 0: + result = result[0] + else: + result = None + return result + + +def parse_list(tree, rule): + result = tree.xpath(rule) + if type(result) is list: + return result + return [result] + + +def handle_login_form(idp, r): + tree = html.fromstring(r.text) + try: + action_url = parse_first(tree, '//form[@id="login_form"]/@action') + method = parse_first(tree, '//form[@id="login_form"]/@method') + except Exception: # pylint: disable=broad-except + return [] + + if action_url is None: + return [] + + headers = {'referer': r.url} + payload = {'login_name': idp['user'], + 'login_password': idp['pass']} + + return [method, + get_new_url(r.url, action_url), + {'headers': headers, 'data': payload}] + + +def handle_return_form(r): + tree = html.fromstring(r.text) + try: + action_url = parse_first(tree, '//form[@id="saml-response"]/@action') + method = parse_first(tree, '//form[@id="saml-response"]/@method') + names = parse_list(tree, '//form[@id="saml-response"]/input/@name') + values = parse_list(tree, '//form[@id="saml-response"]/input/@value') + except Exception: # pylint: disable=broad-except + return [] + + if action_url is None: + return [] + + headers = {'referer': r.url} + payload = {} + for i in range(0, len(names)): + payload[names[i]] = values[i] + + return [method, + get_new_url(r.url, action_url), + {'headers': headers, 'data': payload}] + + +def go_to_url(srvs, idp, start_url, target_url): + + url = start_url + action = 'get' + args = {} + + good = True + while good: + r = access_url(action, srvs, url, **args) # pylint: disable=star-args + if r.status_code == 303: + url = r.headers['location'] + action = 'get' + args = {} + elif r.status_code == 200: + if url == target_url: + return r.text + + result = handle_login_form(idp, r) + if result: + action = result[0] + url = result[1] + args = result[2] + continue + + result = handle_return_form(r) + if result: + action = result[0] + url = result[1] + args = result[2] + continue + + raise ValueError("Unhandled Success code at url %s" % url) + + else: + good = False + + raise ValueError("Unhandled status (%d) on url %s" % (r.status_code, url)) + + +def auth_to_idp(idp): + + target_url = '%s/%s/' % (idp['baseuri'], idp['name']) + srvs = [idp] + + r = access_url('get', srvs, target_url) + if r.status_code != 200: + print >> sys.stderr, " ERROR: Access to idp failed: %s" % repr(r) + return False + + tree = html.fromstring(r.text) + try: + expected = 'Log In' + login = parse_first(tree, '//div[@id="content"]/p/a/text()') + if login != expected: + print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected, + login) + href = parse_first(tree, '//div[@id="content"]/p/a/@href') + start_url = get_new_url(target_url, href) + except Exception, e: # pylint: disable=broad-except + print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e) + return False + + try: + page = go_to_url(srvs, idp, start_url, target_url) + except Exception, e: # pylint: disable=broad-except + print >> sys.stderr, " ERROR: %s" % repr(e) + return False + + tree = html.fromstring(page) + try: + welcome = parse_first(tree, '//div[@id="welcome"]/p/text()') + except Exception, e: # pylint: disable=broad-except + print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e) + return False + + expected = 'Welcome %s!' % idp['user'] + if welcome != expected: + print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected, + welcome) + return False + + return True + + +def add_sp_metadata(idp, sp): + url = '%s/%s/admin/providers/saml2/admin/new' % (idp['baseuri'], + idp['name']) + headers = {'referer': url} + payload = {'name': sp['name']} + m = requests.get('%s/saml2/metadata' % sp['baseuri']) + metafile = {'metafile': m.content} + r = idp['session'].post(url, headers=headers, + data=payload, files=metafile) + if r.status_code != 200: + print >> sys.stderr, " ERROR: %s" % repr(r) + return False + + tree = html.fromstring(r.text) + try: + alert = parse_first(tree, + '//div[@class="alert alert-success"]/p/text()') + except Exception, e: # pylint: disable=broad-except + print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e) + return False + + expected = 'SP Successfully added' + if alert != expected: + print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected, + alert) + return False + + return True + + +if __name__ == '__main__': + basedir = sys.argv[1] + + idpsrv = {'name': 'idp1', + 'baseuri': 'http://127.0.0.10:45080', + 'session': requests.Session(), + 'user': pwd.getpwuid(os.getuid())[0], + 'pass': 'ipsilon'} + spsrv = {'name': 'sp1', + 'baseuri': 'http://127.0.0.11:45081', + 'session': requests.Session()} + + print "test1: Authenticate to IDP ...", + if not auth_to_idp(idpsrv): + sys.exit(1) + print " SUCCESS" + + print "test1: Add SP Metadata to IDP ...", + if not add_sp_metadata(idpsrv, spsrv): + sys.exit(1) + print " SUCCESS" + + print "test1: Access SP Protected Area ...", + servers = [idpsrv, spsrv] + spurl = '%s/sp/' % (spsrv['baseuri']) + try: + text = go_to_url(servers, idpsrv, spurl, spurl) + except ValueError, e: + print >> sys.stderr, " ERROR: %s" % repr(e) + sys.exit(1) + if text != "WORKS!": + print >> sys.stderr, "ERROR: Expected [WORKS!], got [%s]" % text + sys.exit(1) + print " SUCCESS" -- 2.20.1