Add first test, checks client/server installs work
authorSimo Sorce <simo@redhat.com>
Sun, 1 Jun 2014 19:47:44 +0000 (15:47 -0400)
committerSimo Sorce <simo@redhat.com>
Wed, 4 Jun 2014 14:26:34 +0000 (10:26 -0400)
Signed-off-by: Simo Sorce <simo@redhat.com>
Makefile
tests/test1.cfg [new file with mode: 0644]
tests/test1.py [new file with mode: 0755]

index 439caa9..0d7e01a 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -41,6 +41,7 @@ lp-test:
 
 test: lp-test
        PYTHONPATH=./ ./ipsilon/tools/saml2metadata.py
 
 test: lp-test
        PYTHONPATH=./ ./ipsilon/tools/saml2metadata.py
+       ./tests/tests.py --test=test1
 
 sdist:
        python setup.py sdist
 
 sdist:
        python setup.py sdist
diff --git a/tests/test1.cfg b/tests/test1.cfg
new file mode 100644 (file)
index 0000000..81ae254
--- /dev/null
@@ -0,0 +1,36 @@
+[tests]
+servers=idp1:127.0.0.10:45080
+clients=sp1:127.0.0.11:45081
+
+[idp1_globals]
+TEMPLATES=${TESTDIR}/templates/install
+CONFDIR=${TESTDIR}/etc
+DATADIR=${TESTDIR}/lib
+HTTPDCONFD=${TESTDIR}/idp1/conf.d
+STATICDIR=${ROOTDIR}
+BINDIR=${ROOTDIR}/ipsilon
+WSGI_SOCKET_PREFIX=${TESTDIR}/idp1/logs/wsgi
+
+[idp1_arguments]
+hostname=127.0.0.10:45080
+admin_user=${TEST_USER}
+system_user=${TEST_USER}
+instance=idp1
+saml2_secure=no
+testauth=yes
+pam=no
+krb=no
+ipa=no
+
+[sp1_globals]
+HTTPDCONFD=${TESTDIR}/sp1/conf.d
+SAML2_TEMPLATE=${TESTDIR}/templates/install/saml2/sp.conf
+SAML2_CONFFILE=${TESTDIR}/sp1/conf.d/ipsilon-saml.conf
+SAML2_HTTPDIR=${TESTDIR}/sp1/saml2
+
+[sp1_arguments]
+hostname=127.0.0.11:45081
+saml_idp_metadata=http://127.0.0.10:45080/idp1/saml2/metadata
+saml_secure_setup=False
+saml_auth=/sp
+httpd_user=${TEST_USER}
diff --git a/tests/test1.py b/tests/test1.py
new file mode 100755 (executable)
index 0000000..6e76f88
--- /dev/null
@@ -0,0 +1,270 @@
+#!/usr/bin/python
+#
+# Copyright (C) 2014  Simo Sorce <simo@redhat.com>
+#
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+from lxml import html
+import os
+import pwd
+import requests
+import sys
+import urlparse
+
+
+def get_session(srvs, url):
+    for srv in srvs:
+        if url.startswith(srv['baseuri']):
+            return srv['session']
+
+    raise ValueError("Unknown URL: %s" % url)
+
+
+def get_url(srvs, url, **kwargs):
+    session = get_session(srvs, url)
+    return session.get(url, allow_redirects=False, **kwargs)
+
+
+def post_url(srvs, url, **kwargs):
+    session = get_session(srvs, url)
+    return session.post(url, allow_redirects=False, **kwargs)
+
+
+def access_url(action, srvs, url, **kwargs):
+    if action == 'get':
+        return get_url(srvs, url, **kwargs)
+    elif action == 'post':
+        return post_url(srvs, url, **kwargs)
+    else:
+        raise ValueError("Unknown action type: [%s]" % action)
+
+
+def get_new_url(referer, action):
+    if action.startswith('/'):
+        u = urlparse.urlparse(referer)
+        return '%s://%s%s' % (u.scheme, u.netloc, action)
+    return action
+
+
+def parse_first(tree, rule):
+    result = tree.xpath(rule)
+    if type(result) is list:
+        if len(result) > 0:
+            result = result[0]
+        else:
+            result = None
+    return result
+
+
+def parse_list(tree, rule):
+    result = tree.xpath(rule)
+    if type(result) is list:
+        return result
+    return [result]
+
+
+def handle_login_form(idp, r):
+    tree = html.fromstring(r.text)
+    try:
+        action_url = parse_first(tree, '//form[@id="login_form"]/@action')
+        method = parse_first(tree, '//form[@id="login_form"]/@method')
+    except Exception:  # pylint: disable=broad-except
+        return []
+
+    if action_url is None:
+        return []
+
+    headers = {'referer': r.url}
+    payload = {'login_name': idp['user'],
+               'login_password': idp['pass']}
+
+    return [method,
+            get_new_url(r.url, action_url),
+            {'headers': headers, 'data': payload}]
+
+
+def handle_return_form(r):
+    tree = html.fromstring(r.text)
+    try:
+        action_url = parse_first(tree, '//form[@id="saml-response"]/@action')
+        method = parse_first(tree, '//form[@id="saml-response"]/@method')
+        names = parse_list(tree, '//form[@id="saml-response"]/input/@name')
+        values = parse_list(tree, '//form[@id="saml-response"]/input/@value')
+    except Exception:  # pylint: disable=broad-except
+        return []
+
+    if action_url is None:
+        return []
+
+    headers = {'referer': r.url}
+    payload = {}
+    for i in range(0, len(names)):
+        payload[names[i]] = values[i]
+
+    return [method,
+            get_new_url(r.url, action_url),
+            {'headers': headers, 'data': payload}]
+
+
+def go_to_url(srvs, idp, start_url, target_url):
+
+    url = start_url
+    action = 'get'
+    args = {}
+
+    good = True
+    while good:
+        r = access_url(action, srvs, url, **args)  # pylint: disable=star-args
+        if r.status_code == 303:
+            url = r.headers['location']
+            action = 'get'
+            args = {}
+        elif r.status_code == 200:
+            if url == target_url:
+                return r.text
+
+            result = handle_login_form(idp, r)
+            if result:
+                action = result[0]
+                url = result[1]
+                args = result[2]
+                continue
+
+            result = handle_return_form(r)
+            if result:
+                action = result[0]
+                url = result[1]
+                args = result[2]
+                continue
+
+            raise ValueError("Unhandled Success code at url %s" % url)
+
+        else:
+            good = False
+
+    raise ValueError("Unhandled status (%d) on url %s" % (r.status_code, url))
+
+
+def auth_to_idp(idp):
+
+    target_url = '%s/%s/' % (idp['baseuri'], idp['name'])
+    srvs = [idp]
+
+    r = access_url('get', srvs, target_url)
+    if r.status_code != 200:
+        print >> sys.stderr, " ERROR: Access to idp failed: %s" % repr(r)
+        return False
+
+    tree = html.fromstring(r.text)
+    try:
+        expected = 'Log In'
+        login = parse_first(tree, '//div[@id="content"]/p/a/text()')
+        if login != expected:
+            print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
+                                                                     login)
+        href = parse_first(tree, '//div[@id="content"]/p/a/@href')
+        start_url = get_new_url(target_url, href)
+    except Exception, e:  # pylint: disable=broad-except
+        print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
+        return False
+
+    try:
+        page = go_to_url(srvs, idp, start_url, target_url)
+    except Exception, e:  # pylint: disable=broad-except
+        print >> sys.stderr, " ERROR: %s" % repr(e)
+        return False
+
+    tree = html.fromstring(page)
+    try:
+        welcome = parse_first(tree, '//div[@id="welcome"]/p/text()')
+    except Exception, e:  # pylint: disable=broad-except
+        print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
+        return False
+
+    expected = 'Welcome %s!' % idp['user']
+    if welcome != expected:
+        print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
+                                                                 welcome)
+        return False
+
+    return True
+
+
+def add_sp_metadata(idp, sp):
+    url = '%s/%s/admin/providers/saml2/admin/new' % (idp['baseuri'],
+                                                     idp['name'])
+    headers = {'referer': url}
+    payload = {'name': sp['name']}
+    m = requests.get('%s/saml2/metadata' % sp['baseuri'])
+    metafile = {'metafile': m.content}
+    r = idp['session'].post(url, headers=headers,
+                            data=payload, files=metafile)
+    if r.status_code != 200:
+        print >> sys.stderr, " ERROR: %s" % repr(r)
+        return False
+
+    tree = html.fromstring(r.text)
+    try:
+        alert = parse_first(tree,
+                            '//div[@class="alert alert-success"]/p/text()')
+    except Exception, e:  # pylint: disable=broad-except
+        print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
+        return False
+
+    expected = 'SP Successfully added'
+    if alert != expected:
+        print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
+                                                                 alert)
+        return False
+
+    return True
+
+
+if __name__ == '__main__':
+    basedir = sys.argv[1]
+
+    idpsrv = {'name': 'idp1',
+              'baseuri': 'http://127.0.0.10:45080',
+              'session': requests.Session(),
+              'user': pwd.getpwuid(os.getuid())[0],
+              'pass': 'ipsilon'}
+    spsrv = {'name': 'sp1',
+             'baseuri': 'http://127.0.0.11:45081',
+             'session': requests.Session()}
+
+    print "test1: Authenticate to IDP ...",
+    if not auth_to_idp(idpsrv):
+        sys.exit(1)
+    print " SUCCESS"
+
+    print "test1: Add SP Metadata to IDP ...",
+    if not add_sp_metadata(idpsrv, spsrv):
+        sys.exit(1)
+    print " SUCCESS"
+
+    print "test1: Access SP Protected Area ...",
+    servers = [idpsrv, spsrv]
+    spurl = '%s/sp/' % (spsrv['baseuri'])
+    try:
+        text = go_to_url(servers, idpsrv, spurl, spurl)
+    except ValueError, e:
+        print >> sys.stderr, " ERROR: %s" % repr(e)
+        sys.exit(1)
+    if text != "WORKS!":
+        print >> sys.stderr, "ERROR: Expected [WORKS!], got [%s]" % text
+        sys.exit(1)
+    print " SUCCESS"