From 86f5401c7cb620046b6dd7730844998dec595f43 Mon Sep 17 00:00:00 2001 From: Patrick Uiterwijk Date: Tue, 28 Apr 2015 19:11:12 +0200 Subject: [PATCH] Add OpenID test suite This tests core OpenID and the Attribute Exchange, Simple Registration and Teams extensions. Using a small wsgi tool because mod_auth_openid does not support all extensions. Signed-off-by: Patrick Uiterwijk Reviewed-by: Rob Crittenden --- Makefile | 1 + ipsilon/login/authtest.py | 3 +- ipsilon/providers/openidp.py | 7 +- tests/blobs/openid_app.py | 114 ++++++++++++++++++++++++++++++ tests/helpers/http.py | 84 ++++++++++++++++++++-- tests/openid.py | 132 +++++++++++++++++++++++++++++++++++ tests/testmapping.py | 4 +- 7 files changed, 336 insertions(+), 9 deletions(-) create mode 100644 tests/blobs/openid_app.py create mode 100755 tests/openid.py diff --git a/Makefile b/Makefile index dce214a..2434898 100644 --- a/Makefile +++ b/Makefile @@ -98,6 +98,7 @@ tests: wrappers PYTHONPATH=./ ./tests/tests.py --test=pgdb PYTHONPATH=./ ./tests/tests.py --test=fconf PYTHONPATH=./ ./tests/tests.py --test=ldap + PYTHONPATH=./ ./tests/tests.py --test=openid test: lp-test unittests tests diff --git a/ipsilon/login/authtest.py b/ipsilon/login/authtest.py index 7fc4160..5f0ff6e 100644 --- a/ipsilon/login/authtest.py +++ b/ipsilon/login/authtest.py @@ -36,7 +36,8 @@ class TestAuth(LoginFormBase): 'givenname': 'Test User', 'surname': username, 'fullname': 'Test User %s' % username, - 'email': '%s@example.com' % username + 'email': '%s@example.com' % username, + '_groups': [username] } return self.lm.auth_successful(self.trans, username, 'password', testdata) diff --git a/ipsilon/providers/openidp.py b/ipsilon/providers/openidp.py index 4e47d3e..032c406 100644 --- a/ipsilon/providers/openidp.py +++ b/ipsilon/providers/openidp.py @@ -143,6 +143,8 @@ class Installer(ProviderInstaller): help='Configure OpenID Provider') group.add_argument('--openid-dburi', help='OpenID database URI') + group.add_argument('--openid-extensions', default='', + help='List of OpenID Extensions to enable') def configure(self, opts): if opts['openid'] != 'yes': @@ -160,10 +162,11 @@ class Installer(ProviderInstaller): po.wipe_data() po.wipe_config_values() config = {'endpoint url': url, - 'identity_url_template': '%sid/%%(username)s' % url, + 'identity url template': '%sid/%%(username)s' % url, 'database url': opts['openid_dburi'] or opts['database_url'] % { - 'datadir': opts['data_dir'], 'dbname': 'openid'}} + 'datadir': opts['data_dir'], 'dbname': 'openid'}, + 'enabled extensions': opts['openid_extensions']} po.save_plugin_config(config) # Update global config to add login plugin diff --git a/tests/blobs/openid_app.py b/tests/blobs/openid_app.py new file mode 100644 index 0000000..db80bbd --- /dev/null +++ b/tests/blobs/openid_app.py @@ -0,0 +1,114 @@ +# Copyright (C) 2015 Ipsilon project Contributors, for licensee see COPYING +import sys +sys.stdout = sys.stderr + +import cherrypy +import os +import pwd + +from openid.consumer import consumer +from openid.extensions import sreg, ax +from openid_teams import teams + + +class OpenIDApp(object): + def index(self, extensions): + self.extensions = extensions == 'YES' + oidconsumer = consumer.Consumer(dict(), None) + try: + request = oidconsumer.begin('http://127.0.0.10:45080/idp1/') + except Exception as ex: + return 'ERROR: %s' % ex + + if request is None: + return 'ERROR: No request' + + # Attach extensions here + if self.extensions: + request.addExtension(sreg.SRegRequest( + required=['nickname', 'email', 'timezone'])) + ax_req = ax.FetchRequest() + ax_req_name = ax.AttrInfo('http://schema.openid.net/namePerson') + ax_req.add(ax_req_name) + request.addExtension(ax_req) + username = pwd.getpwuid(os.getuid())[0] + request.addExtension(teams.TeamsRequest(requested=[username])) + + # Build and send final request + trust_root = cherrypy.url() + return_to = trust_root + 'finish' + if request.shouldSendRedirect(): + redirect_url = request.redirectURL( + trust_root, return_to) + raise cherrypy.HTTPRedirect(redirect_url) + else: + return request.htmlMarkup( + trust_root, return_to) + index.exposed = True + + def finish(self, **args): + oidconsumer = consumer.Consumer(dict(), None) + info = oidconsumer.complete(cherrypy.request.params, cherrypy.url()) + display_identifier = info.getDisplayIdentifier() + + if info.status == consumer.FAILURE and display_identifier: + return 'ERROR:Verification of %s failed: %s' % ( + display_identifier, info.message) + elif info.status == consumer.CANCEL: + return 'ERROR: Cancelled' + elif info.status == consumer.SUCCESS: + username = pwd.getpwuid(os.getuid())[0] + expected_identifier = 'http://127.0.0.10:45080/idp1/openid/id/%s/'\ + % username + if expected_identifier != display_identifier: + return 'ERROR: Wrong id returned: %s != %s' % ( + expected_identifier, + display_identifier) + + if self.extensions: + sreg_resp = sreg.SRegResponse.fromSuccessResponse(info) + teams_resp = teams.TeamsResponse.fromSuccessResponse(info) + ax_resp = ax.FetchResponse.fromSuccessResponse(info) + + if sreg_resp is None: + return 'ERROR: No sreg!' + elif teams_resp is None: + return 'ERROR: No teams!' + elif ax_resp is None: + return 'ERROR: No AX!' + + # Check values + expected_name = 'Test User %s' % username + expected_email = '%s@example.com' % username + + ax_name = ax_resp.data[ + 'http://schema.openid.net/namePerson'][0] + sreg_email = sreg_resp.data['email'] + + if ax_name != expected_name: + return 'ERROR: Wrong name returned: %s != %s' % ( + expected_name, + ax_name) + + if sreg_email != expected_email: + return 'ERROR: Wrong email returned: %s != %s' % ( + expected_email, + sreg_email) + + if username not in teams_resp.teams: + return 'ERROR: User not in self-named group (%s not in %s)' %\ + (username, teams_resp.teams) + + if self.extensions: + return 'SUCCESS, WITH EXTENSIONS' + else: + return 'SUCCESS, WITHOUT EXTENSIONS' + else: + return 'ERROR: Strange error: %s' % info.message + finish.exposed = True + + +cherrypy.config['environment'] = 'embedded' + +application = cherrypy.Application(OpenIDApp(), + script_name=None, config=None) diff --git a/tests/helpers/http.py b/tests/helpers/http.py index 2478e2a..ff696d4 100755 --- a/tests/helpers/http.py +++ b/tests/helpers/http.py @@ -113,14 +113,17 @@ class HttpSessions(object): return action def get_form_data(self, page, form_id, input_fields): + form_selector = '//form' + if form_id: + form_selector += '[@id="%s"]' % form_id values = [] - action = page.first_value('//form[@id="%s"]/@action' % form_id) + action = page.first_value('%s/@action' % form_selector) values.append(action) - method = page.first_value('//form[@id="%s"]/@method' % form_id) + method = page.first_value('%s/@method' % form_selector) values.append(method) for field in input_fields: - value = page.all_values('//form[@id="%s"]/input/@%s' % (form_id, - field)) + value = page.all_values('%s/input/@%s' % (form_selector, + field)) values.append(value) return values @@ -180,6 +183,65 @@ class HttpSessions(object): return [method, self.new_url(referer, action_url), {'headers': headers, 'data': payload}] + def handle_openid_form(self, page): + if type(page) != PageTree: + raise TypeError("Expected PageTree object") + + if not page.first_value('//title/text()') == \ + 'OpenID transaction in progress': + raise WrongPage('Not OpenID autosubmit form') + + try: + results = self.get_form_data(page, None, + ["name", "value"]) + action_url = results[0] + if action_url is None: + raise Exception + method = results[1] + names = results[2] + values = results[3] + except Exception: # pylint: disable=broad-except + raise WrongPage("Not OpenID autosubmit form") + + referer = page.make_referer() + headers = {'referer': referer} + + payload = {} + for i in range(0, len(names)): + payload[names[i]] = values[i] + + return [method, self.new_url(referer, action_url), + {'headers': headers, 'data': payload}] + + def handle_openid_consent_form(self, page): + if type(page) != PageTree: + raise TypeError("Expected PageTree object") + + try: + results = self.get_form_data(page, "consent_form", + ['name', 'value']) + action_url = results[0] + if action_url is None: + raise Exception + method = results[1] + names = results[2] + values = results[3] + except Exception: # pylint: disable=broad-except + raise WrongPage("Not an OpenID Consent Form Page") + + referer = page.make_referer() + headers = {'referer': referer} + + payload = {} + for i in range(0, len(names)): + payload[names[i]] = values[i] + + # Replace known values + payload['decided_allow'] = 'Allow' + + return [method, self.new_url(referer, action_url), + {'headers': headers, 'data': payload}] + def fetch_page(self, idp, target_url, follow_redirect=True): url = target_url action = 'get' @@ -187,7 +249,7 @@ class HttpSessions(object): while True: r = self.access(action, url, **args) # pylint: disable=star-args - if r.status_code == 303: + if r.status_code == 303 or r.status_code == 302: if not follow_redirect: return PageTree(r) url = r.headers['location'] @@ -208,6 +270,18 @@ class HttpSessions(object): except WrongPage: pass + try: + (action, url, args) = self.handle_openid_consent_form(page) + continue + except WrongPage: + pass + + try: + (action, url, args) = self.handle_openid_form(page) + continue + except WrongPage: + pass + # Either we got what we wanted, or we have to stop anyway return page else: diff --git a/tests/openid.py b/tests/openid.py new file mode 100755 index 0000000..ebc92ba --- /dev/null +++ b/tests/openid.py @@ -0,0 +1,132 @@ +#!/usr/bin/python +# +# Copyright (C) 2014 Simo Sorce +# +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +from helpers.common import IpsilonTestBase # pylint: disable=relative-import +from helpers.http import HttpSessions # pylint: disable=relative-import +import os +import pwd +import sys +import inspect +from string import Template + +idp_g = {'TEMPLATES': '${TESTDIR}/templates/install', + 'CONFDIR': '${TESTDIR}/etc', + 'DATADIR': '${TESTDIR}/lib', + 'HTTPDCONFD': '${TESTDIR}/${NAME}/conf.d', + 'STATICDIR': '${ROOTDIR}', + 'BINDIR': '${ROOTDIR}/ipsilon', + 'WSGI_SOCKET_PREFIX': '${TESTDIR}/${NAME}/logs/wsgi'} + + +idp_a = {'hostname': '${ADDRESS}:${PORT}', + 'admin_user': '${TEST_USER}', + 'system_user': '${TEST_USER}', + 'instance': '${NAME}', + 'secure': 'no', + 'testauth': 'yes', + 'openid': 'yes', + 'openid_extensions': 'Attribute Exchange,Simple Registration,Teams', + 'pam': 'no', + 'krb': 'no', + 'ipa': 'no', + 'server_debugging': 'True'} + + +def fixup_sp_httpd(httpdir, testdir): + client_wsgi = """ + +WSGIScriptAlias / ${TESTDIR}/blobs/openid_app.py + + + Require all granted + +""" + t = Template(client_wsgi) + text = t.substitute({'TESTDIR': testdir}) + with open(httpdir + '/conf.d/ipsilon-openid-client.conf', 'a') as f: + f.write(text) + + +class IpsilonTest(IpsilonTestBase): + + def __init__(self): + super(IpsilonTest, self).__init__('openid', __file__) + + def setup_servers(self, env=None): + print "Installing IDP server" + name = 'idp1' + addr = '127.0.0.10' + port = '45080' + idp = self.generate_profile(idp_g, idp_a, name, addr, port) + conf = self.setup_idp_server(idp, name, addr, port, env) + + print "Starting IDP's httpd server" + self.start_http_server(conf, env) + + print "Installing first SP server" + name = 'sp1' + addr = '127.0.0.11' + port = '45081' + conf = self.setup_http(name, addr, port) + testdir = os.path.dirname(os.path.abspath(inspect.getfile( + inspect.currentframe()))) + fixup_sp_httpd(os.path.dirname(conf), testdir) + + print "Starting SP's httpd server" + self.start_http_server(conf, env) + + +if __name__ == '__main__': + + idpname = 'idp1' + sp1name = 'sp1' + user = pwd.getpwuid(os.getuid())[0] + + sess = HttpSessions() + sess.add_server(idpname, 'http://127.0.0.10:45080', user, 'ipsilon') + sess.add_server(sp1name, 'http://127.0.0.11:45081') + + print "openid: Authenticate to IDP ...", + try: + sess.auth_to_idp(idpname) + except Exception as e: # pylint: disable=broad-except + print >> sys.stderr, " ERROR: %s" % repr(e) + sys.exit(1) + print " SUCCESS" + + print "openid: Run OpenID Protocol ...", + try: + page = sess.fetch_page(idpname, + 'http://127.0.0.11:45081/?extensions=NO') + page.expected_value('text()', 'SUCCESS, WITHOUT EXTENSIONS') + except ValueError as e: + print >> sys.stderr, " ERROR: %s" % repr(e) + sys.exit(1) + print " SUCCESS" + + print "openid: Run OpenID Protocol with extensions ...", + try: + page = sess.fetch_page(idpname, + 'http://127.0.0.11:45081/?extensions=YES') + page.expected_value('text()', 'SUCCESS, WITH EXTENSIONS') + except ValueError as e: + print >> sys.stderr, " ERROR: %s" % repr(e) + sys.exit(1) + print " SUCCESS" diff --git a/tests/testmapping.py b/tests/testmapping.py index 1bc69ec..d5e5dd0 100755 --- a/tests/testmapping.py +++ b/tests/testmapping.py @@ -65,7 +65,7 @@ def check_info_plugin(s, idp_name, urlbase, expected): """ Logout, login, fetch SP page to get the info variables and compare the MELLON_ ones to what we expect. IDP and NAMEID are - ignored. The authtest plugin returns no groups. + ignored. """ # Log out @@ -195,6 +195,7 @@ if __name__ == '__main__': 'surname': user, 'givenname': 'Test User', 'email': '%s@example.com' % user, + 'groups': user, } check_info_plugin(sess, idpname, spurl, expect) except Exception, e: # pylint: disable=broad-except @@ -221,6 +222,7 @@ if __name__ == '__main__': 'surname': user, 'givenname': 'Test User', 'email': '%s@example.com' % user, + 'groups': user } check_info_plugin(sess, idpname, spurl, expect) except Exception, e: # pylint: disable=broad-except -- 2.20.1