Allow SP registration from ipsilon-client-install
[cascardo/ipsilon.git] / tests / test1.py
index 6e76f88..3e0cfc2 100755 (executable)
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 
-from lxml import html
+from helpers.common import IpsilonTestBase  # pylint: disable=relative-import
+from helpers.http import HttpSessions  # pylint: disable=relative-import
 import os
 import pwd
-import requests
 import sys
-import urlparse
+from string import Template
+
+idp_g = {'TEMPLATES': '${TESTDIR}/templates/install',
+         'CONFDIR': '${TESTDIR}/etc',
+         'DATADIR': '${TESTDIR}/lib',
+         'HTTPDCONFD': '${TESTDIR}/${NAME}/conf.d',
+         'STATICDIR': '${ROOTDIR}',
+         'BINDIR': '${ROOTDIR}/ipsilon',
+         'WSGI_SOCKET_PREFIX': '${TESTDIR}/${NAME}/logs/wsgi'}
+
+
+idp_a = {'hostname': '${ADDRESS}:${PORT}',
+         'admin_user': '${TEST_USER}',
+         'system_user': '${TEST_USER}',
+         'instance': '${NAME}',
+         'secure': 'no',
+         'testauth': 'yes',
+         'pam': 'no',
+         'krb': 'no',
+         'ipa': 'no',
+         'server_debugging': 'True'}
+
+
+sp_g = {'HTTPDCONFD': '${TESTDIR}/${NAME}/conf.d',
+        'SAML2_TEMPLATE': '${TESTDIR}/templates/install/saml2/sp.conf',
+        'SAML2_CONFFILE': '${TESTDIR}/${NAME}/conf.d/ipsilon-saml.conf',
+        'SAML2_HTTPDIR': '${TESTDIR}/${NAME}/saml2'}
+
+
+sp_a = {'hostname': '${ADDRESS}:${PORT}',
+        'saml_idp_metadata': 'http://127.0.0.10:45080/idp1/saml2/metadata',
+        'saml_secure_setup': 'False',
+        'saml_auth': '/sp',
+        'httpd_user': '${TEST_USER}'}
+
+sp2_g = {'HTTPDCONFD': '${TESTDIR}/${NAME}/conf.d',
+         'SAML2_TEMPLATE': '${TESTDIR}/templates/install/saml2/sp.conf',
+         'SAML2_CONFFILE': '${TESTDIR}/${NAME}/conf.d/ipsilon-saml.conf',
+         'SAML2_HTTPDIR': '${TESTDIR}/${NAME}/saml2'}
+
+sp2_a = {'hostname': '${ADDRESS}:${PORT}',
+         'saml_idp_url': 'http://127.0.0.10:45080/idp1',
+         'admin_user': '${TEST_USER}',
+         'admin_password': '${TESTDIR}/pw.txt',
+         'saml_sp_name': 'sp2',
+         'saml_secure_setup': 'False',
+         'saml_auth': '/sp',
+         'httpd_user': '${TEST_USER}'}
+
+
+def fixup_sp_httpd(httpdir):
+    location = """
+
+Alias /sp ${HTTPDIR}/sp
+
+<Directory ${HTTPDIR}/sp>
+    Require all granted
+</Directory>
+"""
+    index = """WORKS!"""
+
+    t = Template(location)
+    text = t.substitute({'HTTPDIR': httpdir})
+    with open(httpdir + '/conf.d/ipsilon-saml.conf', 'a') as f:
+        f.write(text)
+
+    os.mkdir(httpdir + '/sp')
+    with open(httpdir + '/sp/index.html', 'w') as f:
+        f.write(index)
+
+
+class IpsilonTest(IpsilonTestBase):
+
+    def __init__(self):
+        super(IpsilonTest, self).__init__('test1', __file__)
+
+    def setup_servers(self, env=None):
+        print "Installing IDP server"
+        name = 'idp1'
+        addr = '127.0.0.10'
+        port = '45080'
+        idp = self.generate_profile(idp_g, idp_a, name, addr, port)
+        conf = self.setup_idp_server(idp, name, addr, port, env)
+
+        print "Starting IDP's httpd server"
+        self.start_http_server(conf, env)
+
+        print "Installing first SP server"
+        name = 'sp1'
+        addr = '127.0.0.11'
+        port = '45081'
+        sp = self.generate_profile(sp_g, sp_a, name, addr, port)
+        conf = self.setup_sp_server(sp, name, addr, port, env)
+        fixup_sp_httpd(os.path.dirname(conf))
+
+        print "Starting first SP's httpd server"
+        self.start_http_server(conf, env)
+
+        print "Installing second SP server"
+        name = 'sp2'
+        addr = '127.0.0.11'
+        port = '45082'
+        sp = self.generate_profile(sp2_g, sp2_a, name, addr, port)
+        with open(os.path.dirname(sp) + '/pw.txt', 'a') as f:
+            f.write('ipsilon')
+        conf = self.setup_sp_server(sp, name, addr, port, env)
+        os.remove(os.path.dirname(sp) + '/pw.txt')
+        fixup_sp_httpd(os.path.dirname(conf))
+
+        print "Starting second SP's httpd server"
+        self.start_http_server(conf, env)
 
 
-def get_session(srvs, url):
-    for srv in srvs:
-        if url.startswith(srv['baseuri']):
-            return srv['session']
-
-    raise ValueError("Unknown URL: %s" % url)
-
-
-def get_url(srvs, url, **kwargs):
-    session = get_session(srvs, url)
-    return session.get(url, allow_redirects=False, **kwargs)
-
-
-def post_url(srvs, url, **kwargs):
-    session = get_session(srvs, url)
-    return session.post(url, allow_redirects=False, **kwargs)
-
-
-def access_url(action, srvs, url, **kwargs):
-    if action == 'get':
-        return get_url(srvs, url, **kwargs)
-    elif action == 'post':
-        return post_url(srvs, url, **kwargs)
-    else:
-        raise ValueError("Unknown action type: [%s]" % action)
-
-
-def get_new_url(referer, action):
-    if action.startswith('/'):
-        u = urlparse.urlparse(referer)
-        return '%s://%s%s' % (u.scheme, u.netloc, action)
-    return action
-
-
-def parse_first(tree, rule):
-    result = tree.xpath(rule)
-    if type(result) is list:
-        if len(result) > 0:
-            result = result[0]
-        else:
-            result = None
-    return result
-
-
-def parse_list(tree, rule):
-    result = tree.xpath(rule)
-    if type(result) is list:
-        return result
-    return [result]
-
-
-def handle_login_form(idp, r):
-    tree = html.fromstring(r.text)
-    try:
-        action_url = parse_first(tree, '//form[@id="login_form"]/@action')
-        method = parse_first(tree, '//form[@id="login_form"]/@method')
-    except Exception:  # pylint: disable=broad-except
-        return []
-
-    if action_url is None:
-        return []
-
-    headers = {'referer': r.url}
-    payload = {'login_name': idp['user'],
-               'login_password': idp['pass']}
-
-    return [method,
-            get_new_url(r.url, action_url),
-            {'headers': headers, 'data': payload}]
-
-
-def handle_return_form(r):
-    tree = html.fromstring(r.text)
-    try:
-        action_url = parse_first(tree, '//form[@id="saml-response"]/@action')
-        method = parse_first(tree, '//form[@id="saml-response"]/@method')
-        names = parse_list(tree, '//form[@id="saml-response"]/input/@name')
-        values = parse_list(tree, '//form[@id="saml-response"]/input/@value')
-    except Exception:  # pylint: disable=broad-except
-        return []
-
-    if action_url is None:
-        return []
-
-    headers = {'referer': r.url}
-    payload = {}
-    for i in range(0, len(names)):
-        payload[names[i]] = values[i]
-
-    return [method,
-            get_new_url(r.url, action_url),
-            {'headers': headers, 'data': payload}]
-
-
-def go_to_url(srvs, idp, start_url, target_url):
-
-    url = start_url
-    action = 'get'
-    args = {}
-
-    good = True
-    while good:
-        r = access_url(action, srvs, url, **args)  # pylint: disable=star-args
-        if r.status_code == 303:
-            url = r.headers['location']
-            action = 'get'
-            args = {}
-        elif r.status_code == 200:
-            if url == target_url:
-                return r.text
-
-            result = handle_login_form(idp, r)
-            if result:
-                action = result[0]
-                url = result[1]
-                args = result[2]
-                continue
-
-            result = handle_return_form(r)
-            if result:
-                action = result[0]
-                url = result[1]
-                args = result[2]
-                continue
-
-            raise ValueError("Unhandled Success code at url %s" % url)
-
-        else:
-            good = False
-
-    raise ValueError("Unhandled status (%d) on url %s" % (r.status_code, url))
-
-
-def auth_to_idp(idp):
-
-    target_url = '%s/%s/' % (idp['baseuri'], idp['name'])
-    srvs = [idp]
+if __name__ == '__main__':
 
-    r = access_url('get', srvs, target_url)
-    if r.status_code != 200:
-        print >> sys.stderr, " ERROR: Access to idp failed: %s" % repr(r)
-        return False
+    idpname = 'idp1'
+    sp1name = 'sp1'
+    sp2name = 'sp2'
+    user = pwd.getpwuid(os.getuid())[0]
 
-    tree = html.fromstring(r.text)
-    try:
-        expected = 'Log In'
-        login = parse_first(tree, '//div[@id="content"]/p/a/text()')
-        if login != expected:
-            print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
-                                                                     login)
-        href = parse_first(tree, '//div[@id="content"]/p/a/@href')
-        start_url = get_new_url(target_url, href)
-    except Exception, e:  # pylint: disable=broad-except
-        print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
-        return False
+    sess = HttpSessions()
+    sess.add_server(idpname, 'http://127.0.0.10:45080', user, 'ipsilon')
+    sess.add_server(sp1name, 'http://127.0.0.11:45081')
+    sess.add_server(sp2name, 'http://127.0.0.11:45082')
 
+    print "test1: Authenticate to IDP ...",
     try:
-        page = go_to_url(srvs, idp, start_url, target_url)
+        sess.auth_to_idp(idpname)
     except Exception, e:  # pylint: disable=broad-except
         print >> sys.stderr, " ERROR: %s" % repr(e)
-        return False
+        sys.exit(1)
+    print " SUCCESS"
 
-    tree = html.fromstring(page)
+    print "test1: Add first SP Metadata to IDP ...",
     try:
-        welcome = parse_first(tree, '//div[@id="welcome"]/p/text()')
+        sess.add_sp_metadata(idpname, sp1name)
     except Exception, e:  # pylint: disable=broad-except
-        print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
-        return False
-
-    expected = 'Welcome %s!' % idp['user']
-    if welcome != expected:
-        print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
-                                                                 welcome)
-        return False
-
-    return True
-
-
-def add_sp_metadata(idp, sp):
-    url = '%s/%s/admin/providers/saml2/admin/new' % (idp['baseuri'],
-                                                     idp['name'])
-    headers = {'referer': url}
-    payload = {'name': sp['name']}
-    m = requests.get('%s/saml2/metadata' % sp['baseuri'])
-    metafile = {'metafile': m.content}
-    r = idp['session'].post(url, headers=headers,
-                            data=payload, files=metafile)
-    if r.status_code != 200:
-        print >> sys.stderr, " ERROR: %s" % repr(r)
-        return False
-
-    tree = html.fromstring(r.text)
-    try:
-        alert = parse_first(tree,
-                            '//div[@class="alert alert-success"]/p/text()')
-    except Exception, e:  # pylint: disable=broad-except
-        print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
-        return False
-
-    expected = 'SP Successfully added'
-    if alert != expected:
-        print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
-                                                                 alert)
-        return False
-
-    return True
-
-
-if __name__ == '__main__':
-    basedir = sys.argv[1]
-
-    idpsrv = {'name': 'idp1',
-              'baseuri': 'http://127.0.0.10:45080',
-              'session': requests.Session(),
-              'user': pwd.getpwuid(os.getuid())[0],
-              'pass': 'ipsilon'}
-    spsrv = {'name': 'sp1',
-             'baseuri': 'http://127.0.0.11:45081',
-             'session': requests.Session()}
-
-    print "test1: Authenticate to IDP ...",
-    if not auth_to_idp(idpsrv):
+        print >> sys.stderr, " ERROR: %s" % repr(e)
         sys.exit(1)
     print " SUCCESS"
 
-    print "test1: Add SP Metadata to IDP ...",
-    if not add_sp_metadata(idpsrv, spsrv):
+    print "test1: Access first SP Protected Area ...",
+    try:
+        page = sess.fetch_page(idpname, 'http://127.0.0.11:45081/sp/')
+        page.expected_value('text()', 'WORKS!')
+    except ValueError, e:
+        print >> sys.stderr, " ERROR: %s" % repr(e)
         sys.exit(1)
     print " SUCCESS"
 
-    print "test1: Access SP Protected Area ...",
-    servers = [idpsrv, spsrv]
-    spurl = '%s/sp/' % (spsrv['baseuri'])
+    print "test1: Access second SP Protected Area ...",
     try:
-        text = go_to_url(servers, idpsrv, spurl, spurl)
+        page = sess.fetch_page(idpname, 'http://127.0.0.11:45082/sp/')
+        page.expected_value('text()', 'WORKS!')
     except ValueError, e:
         print >> sys.stderr, " ERROR: %s" % repr(e)
         sys.exit(1)
-    if text != "WORKS!":
-        print >> sys.stderr, "ERROR: Expected [WORKS!], got [%s]" % text
-        sys.exit(1)
     print " SUCCESS"
+
+    print "test1: Try authentication failure ...",
+    newsess = HttpSessions()
+    newsess.add_server(idpname, 'http://127.0.0.10:45080', user, 'wrong')
+    try:
+        newsess.auth_to_idp(idpname)
+        print >> sys.stderr, " ERROR: Authentication should have failed"
+        sys.exit(1)
+    except Exception, e:  # pylint: disable=broad-except
+        print " SUCCESS"