Add OpenID test suite
authorPatrick Uiterwijk <puiterwijk@redhat.com>
Tue, 28 Apr 2015 17:11:12 +0000 (19:11 +0200)
committerPatrick Uiterwijk <puiterwijk@redhat.com>
Tue, 28 Apr 2015 18:53:06 +0000 (20:53 +0200)
This tests core OpenID and the Attribute Exchange,
Simple Registration and Teams extensions.

Using a small wsgi tool because mod_auth_openid does
not support all extensions.

Signed-off-by: Patrick Uiterwijk <puiterwijk@redhat.com>
Reviewed-by: Rob Crittenden <rcritten@redhat.com>
Makefile
ipsilon/login/authtest.py
ipsilon/providers/openidp.py
tests/blobs/openid_app.py [new file with mode: 0644]
tests/helpers/http.py
tests/openid.py [new file with mode: 0755]
tests/testmapping.py

index dce214a..2434898 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -98,6 +98,7 @@ tests: wrappers
        PYTHONPATH=./ ./tests/tests.py --test=pgdb
        PYTHONPATH=./ ./tests/tests.py --test=fconf
        PYTHONPATH=./ ./tests/tests.py --test=ldap
+       PYTHONPATH=./ ./tests/tests.py --test=openid
 
 test: lp-test unittests tests
 
index 7fc4160..5f0ff6e 100644 (file)
@@ -36,7 +36,8 @@ class TestAuth(LoginFormBase):
                     'givenname': 'Test User',
                     'surname': username,
                     'fullname': 'Test User %s' % username,
-                    'email': '%s@example.com' % username
+                    'email': '%s@example.com' % username,
+                    '_groups': [username]
                 }
                 return self.lm.auth_successful(self.trans,
                                                username, 'password', testdata)
index 4e47d3e..032c406 100644 (file)
@@ -143,6 +143,8 @@ class Installer(ProviderInstaller):
                            help='Configure OpenID Provider')
         group.add_argument('--openid-dburi',
                            help='OpenID database URI')
+        group.add_argument('--openid-extensions', default='',
+                           help='List of OpenID Extensions to enable')
 
     def configure(self, opts):
         if opts['openid'] != 'yes':
@@ -160,10 +162,11 @@ class Installer(ProviderInstaller):
         po.wipe_data()
         po.wipe_config_values()
         config = {'endpoint url': url,
-                  'identity_url_template': '%sid/%%(username)s' % url,
+                  'identity url template': '%sid/%%(username)s' % url,
                   'database url': opts['openid_dburi'] or
                   opts['database_url'] % {
-                      'datadir': opts['data_dir'], 'dbname': 'openid'}}
+                      'datadir': opts['data_dir'], 'dbname': 'openid'},
+                  'enabled extensions': opts['openid_extensions']}
         po.save_plugin_config(config)
 
         # Update global config to add login plugin
diff --git a/tests/blobs/openid_app.py b/tests/blobs/openid_app.py
new file mode 100644 (file)
index 0000000..db80bbd
--- /dev/null
@@ -0,0 +1,114 @@
+# Copyright (C) 2015  Ipsilon project Contributors, for licensee see COPYING
+import sys
+sys.stdout = sys.stderr
+
+import cherrypy
+import os
+import pwd
+
+from openid.consumer import consumer
+from openid.extensions import sreg, ax
+from openid_teams import teams
+
+
+class OpenIDApp(object):
+    def index(self, extensions):
+        self.extensions = extensions == 'YES'
+        oidconsumer = consumer.Consumer(dict(), None)
+        try:
+            request = oidconsumer.begin('http://127.0.0.10:45080/idp1/')
+        except Exception as ex:
+            return 'ERROR: %s' % ex
+
+        if request is None:
+            return 'ERROR: No request'
+
+        # Attach extensions here
+        if self.extensions:
+            request.addExtension(sreg.SRegRequest(
+                required=['nickname', 'email', 'timezone']))
+            ax_req = ax.FetchRequest()
+            ax_req_name = ax.AttrInfo('http://schema.openid.net/namePerson')
+            ax_req.add(ax_req_name)
+            request.addExtension(ax_req)
+            username = pwd.getpwuid(os.getuid())[0]
+            request.addExtension(teams.TeamsRequest(requested=[username]))
+
+        # Build and send final request
+        trust_root = cherrypy.url()
+        return_to = trust_root + 'finish'
+        if request.shouldSendRedirect():
+            redirect_url = request.redirectURL(
+                trust_root, return_to)
+            raise cherrypy.HTTPRedirect(redirect_url)
+        else:
+            return request.htmlMarkup(
+                trust_root, return_to)
+    index.exposed = True
+
+    def finish(self, **args):
+        oidconsumer = consumer.Consumer(dict(), None)
+        info = oidconsumer.complete(cherrypy.request.params, cherrypy.url())
+        display_identifier = info.getDisplayIdentifier()
+
+        if info.status == consumer.FAILURE and display_identifier:
+            return 'ERROR:Verification of %s failed: %s' % (
+                display_identifier, info.message)
+        elif info.status == consumer.CANCEL:
+            return 'ERROR: Cancelled'
+        elif info.status == consumer.SUCCESS:
+            username = pwd.getpwuid(os.getuid())[0]
+            expected_identifier = 'http://127.0.0.10:45080/idp1/openid/id/%s/'\
+                % username
+            if expected_identifier != display_identifier:
+                return 'ERROR: Wrong id returned: %s != %s' % (
+                    expected_identifier,
+                    display_identifier)
+
+            if self.extensions:
+                sreg_resp = sreg.SRegResponse.fromSuccessResponse(info)
+                teams_resp = teams.TeamsResponse.fromSuccessResponse(info)
+                ax_resp = ax.FetchResponse.fromSuccessResponse(info)
+
+                if sreg_resp is None:
+                    return 'ERROR: No sreg!'
+                elif teams_resp is None:
+                    return 'ERROR: No teams!'
+                elif ax_resp is None:
+                    return 'ERROR: No AX!'
+
+                # Check values
+                expected_name = 'Test User %s' % username
+                expected_email = '%s@example.com' % username
+
+                ax_name = ax_resp.data[
+                    'http://schema.openid.net/namePerson'][0]
+                sreg_email = sreg_resp.data['email']
+
+                if ax_name != expected_name:
+                    return 'ERROR: Wrong name returned: %s != %s' % (
+                        expected_name,
+                        ax_name)
+
+                if sreg_email != expected_email:
+                    return 'ERROR: Wrong email returned: %s != %s' % (
+                        expected_email,
+                        sreg_email)
+
+                if username not in teams_resp.teams:
+                    return 'ERROR: User not in self-named group (%s not in %s)' %\
+                        (username, teams_resp.teams)
+
+            if self.extensions:
+                return 'SUCCESS, WITH EXTENSIONS'
+            else:
+                return 'SUCCESS, WITHOUT EXTENSIONS'
+        else:
+            return 'ERROR: Strange error: %s' % info.message
+    finish.exposed = True
+
+
+cherrypy.config['environment'] = 'embedded'
+
+application = cherrypy.Application(OpenIDApp(),
+                                   script_name=None, config=None)
index 2478e2a..ff696d4 100755 (executable)
@@ -113,14 +113,17 @@ class HttpSessions(object):
         return action
 
     def get_form_data(self, page, form_id, input_fields):
+        form_selector = '//form'
+        if form_id:
+            form_selector += '[@id="%s"]' % form_id
         values = []
-        action = page.first_value('//form[@id="%s"]/@action' % form_id)
+        action = page.first_value('%s/@action' % form_selector)
         values.append(action)
-        method = page.first_value('//form[@id="%s"]/@method' % form_id)
+        method = page.first_value('%s/@method' % form_selector)
         values.append(method)
         for field in input_fields:
-            value = page.all_values('//form[@id="%s"]/input/@%s' % (form_id,
-                                                                    field))
+            value = page.all_values('%s/input/@%s' % (form_selector,
+                                                      field))
             values.append(value)
         return values
 
@@ -180,6 +183,65 @@ class HttpSessions(object):
         return [method, self.new_url(referer, action_url),
                 {'headers': headers, 'data': payload}]
 
+    def handle_openid_form(self, page):
+        if type(page) != PageTree:
+            raise TypeError("Expected PageTree object")
+
+        if not page.first_value('//title/text()') == \
+                'OpenID transaction in progress':
+            raise WrongPage('Not OpenID autosubmit form')
+
+        try:
+            results = self.get_form_data(page, None,
+                                         ["name", "value"])
+            action_url = results[0]
+            if action_url is None:
+                raise Exception
+            method = results[1]
+            names = results[2]
+            values = results[3]
+        except Exception:  # pylint: disable=broad-except
+            raise WrongPage("Not OpenID autosubmit form")
+
+        referer = page.make_referer()
+        headers = {'referer': referer}
+
+        payload = {}
+        for i in range(0, len(names)):
+            payload[names[i]] = values[i]
+
+        return [method, self.new_url(referer, action_url),
+                {'headers': headers, 'data': payload}]
+
+    def handle_openid_consent_form(self, page):
+        if type(page) != PageTree:
+            raise TypeError("Expected PageTree object")
+
+        try:
+            results = self.get_form_data(page, "consent_form",
+                                         ['name', 'value'])
+            action_url = results[0]
+            if action_url is None:
+                raise Exception
+            method = results[1]
+            names = results[2]
+            values = results[3]
+        except Exception:  # pylint: disable=broad-except
+            raise WrongPage("Not an OpenID Consent Form Page")
+
+        referer = page.make_referer()
+        headers = {'referer': referer}
+
+        payload = {}
+        for i in range(0, len(names)):
+            payload[names[i]] = values[i]
+
+        # Replace known values
+        payload['decided_allow'] = 'Allow'
+
+        return [method, self.new_url(referer, action_url),
+                {'headers': headers, 'data': payload}]
+
     def fetch_page(self, idp, target_url, follow_redirect=True):
         url = target_url
         action = 'get'
@@ -187,7 +249,7 @@ class HttpSessions(object):
 
         while True:
             r = self.access(action, url, **args)  # pylint: disable=star-args
-            if r.status_code == 303:
+            if r.status_code == 303 or r.status_code == 302:
                 if not follow_redirect:
                     return PageTree(r)
                 url = r.headers['location']
@@ -208,6 +270,18 @@ class HttpSessions(object):
                 except WrongPage:
                     pass
 
+                try:
+                    (action, url, args) = self.handle_openid_consent_form(page)
+                    continue
+                except WrongPage:
+                    pass
+
+                try:
+                    (action, url, args) = self.handle_openid_form(page)
+                    continue
+                except WrongPage:
+                    pass
+
                 # Either we got what we wanted, or we have to stop anyway
                 return page
             else:
diff --git a/tests/openid.py b/tests/openid.py
new file mode 100755 (executable)
index 0000000..ebc92ba
--- /dev/null
@@ -0,0 +1,132 @@
+#!/usr/bin/python
+#
+# Copyright (C) 2014  Simo Sorce <simo@redhat.com>
+#
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+from helpers.common import IpsilonTestBase  # pylint: disable=relative-import
+from helpers.http import HttpSessions  # pylint: disable=relative-import
+import os
+import pwd
+import sys
+import inspect
+from string import Template
+
+idp_g = {'TEMPLATES': '${TESTDIR}/templates/install',
+         'CONFDIR': '${TESTDIR}/etc',
+         'DATADIR': '${TESTDIR}/lib',
+         'HTTPDCONFD': '${TESTDIR}/${NAME}/conf.d',
+         'STATICDIR': '${ROOTDIR}',
+         'BINDIR': '${ROOTDIR}/ipsilon',
+         'WSGI_SOCKET_PREFIX': '${TESTDIR}/${NAME}/logs/wsgi'}
+
+
+idp_a = {'hostname': '${ADDRESS}:${PORT}',
+         'admin_user': '${TEST_USER}',
+         'system_user': '${TEST_USER}',
+         'instance': '${NAME}',
+         'secure': 'no',
+         'testauth': 'yes',
+         'openid': 'yes',
+         'openid_extensions': 'Attribute Exchange,Simple Registration,Teams',
+         'pam': 'no',
+         'krb': 'no',
+         'ipa': 'no',
+         'server_debugging': 'True'}
+
+
+def fixup_sp_httpd(httpdir, testdir):
+    client_wsgi = """
+
+WSGIScriptAlias / ${TESTDIR}/blobs/openid_app.py
+
+<Directory ${TESTDIR}/blobs>
+    Require all granted
+</Directory>
+"""
+    t = Template(client_wsgi)
+    text = t.substitute({'TESTDIR': testdir})
+    with open(httpdir + '/conf.d/ipsilon-openid-client.conf', 'a') as f:
+        f.write(text)
+
+
+class IpsilonTest(IpsilonTestBase):
+
+    def __init__(self):
+        super(IpsilonTest, self).__init__('openid', __file__)
+
+    def setup_servers(self, env=None):
+        print "Installing IDP server"
+        name = 'idp1'
+        addr = '127.0.0.10'
+        port = '45080'
+        idp = self.generate_profile(idp_g, idp_a, name, addr, port)
+        conf = self.setup_idp_server(idp, name, addr, port, env)
+
+        print "Starting IDP's httpd server"
+        self.start_http_server(conf, env)
+
+        print "Installing first SP server"
+        name = 'sp1'
+        addr = '127.0.0.11'
+        port = '45081'
+        conf = self.setup_http(name, addr, port)
+        testdir = os.path.dirname(os.path.abspath(inspect.getfile(
+            inspect.currentframe())))
+        fixup_sp_httpd(os.path.dirname(conf), testdir)
+
+        print "Starting SP's httpd server"
+        self.start_http_server(conf, env)
+
+
+if __name__ == '__main__':
+
+    idpname = 'idp1'
+    sp1name = 'sp1'
+    user = pwd.getpwuid(os.getuid())[0]
+
+    sess = HttpSessions()
+    sess.add_server(idpname, 'http://127.0.0.10:45080', user, 'ipsilon')
+    sess.add_server(sp1name, 'http://127.0.0.11:45081')
+
+    print "openid: Authenticate to IDP ...",
+    try:
+        sess.auth_to_idp(idpname)
+    except Exception as e:  # pylint: disable=broad-except
+        print >> sys.stderr, " ERROR: %s" % repr(e)
+        sys.exit(1)
+    print " SUCCESS"
+
+    print "openid: Run OpenID Protocol ...",
+    try:
+        page = sess.fetch_page(idpname,
+                               'http://127.0.0.11:45081/?extensions=NO')
+        page.expected_value('text()', 'SUCCESS, WITHOUT EXTENSIONS')
+    except ValueError as e:
+        print >> sys.stderr, " ERROR: %s" % repr(e)
+        sys.exit(1)
+    print " SUCCESS"
+
+    print "openid: Run OpenID Protocol with extensions ...",
+    try:
+        page = sess.fetch_page(idpname,
+                               'http://127.0.0.11:45081/?extensions=YES')
+        page.expected_value('text()', 'SUCCESS, WITH EXTENSIONS')
+    except ValueError as e:
+        print >> sys.stderr, " ERROR: %s" % repr(e)
+        sys.exit(1)
+    print " SUCCESS"
index 1bc69ec..d5e5dd0 100755 (executable)
@@ -65,7 +65,7 @@ def check_info_plugin(s, idp_name, urlbase, expected):
     """
     Logout, login, fetch SP page to get the info variables and
     compare the MELLON_ ones to what we expect.  IDP and NAMEID are
-    ignored. The authtest plugin returns no groups.
+    ignored.
     """
 
     # Log out
@@ -195,6 +195,7 @@ if __name__ == '__main__':
             'surname': user,
             'givenname': 'Test User',
             'email': '%s@example.com' % user,
+            'groups': user,
         }
         check_info_plugin(sess, idpname, spurl, expect)
     except Exception, e:  # pylint: disable=broad-except
@@ -221,6 +222,7 @@ if __name__ == '__main__':
             'surname': user,
             'givenname': 'Test User',
             'email': '%s@example.com' % user,
+            'groups': user
         }
         check_info_plugin(sess, idpname, spurl, expect)
     except Exception, e:  # pylint: disable=broad-except