Add OpenID test suite
[cascardo/ipsilon.git] / tests / blobs / openid_app.py
diff --git a/tests/blobs/openid_app.py b/tests/blobs/openid_app.py
new file mode 100644 (file)
index 0000000..db80bbd
--- /dev/null
@@ -0,0 +1,114 @@
+# Copyright (C) 2015  Ipsilon project Contributors, for licensee see COPYING
+import sys
+sys.stdout = sys.stderr
+
+import cherrypy
+import os
+import pwd
+
+from openid.consumer import consumer
+from openid.extensions import sreg, ax
+from openid_teams import teams
+
+
+class OpenIDApp(object):
+    def index(self, extensions):
+        self.extensions = extensions == 'YES'
+        oidconsumer = consumer.Consumer(dict(), None)
+        try:
+            request = oidconsumer.begin('http://127.0.0.10:45080/idp1/')
+        except Exception as ex:
+            return 'ERROR: %s' % ex
+
+        if request is None:
+            return 'ERROR: No request'
+
+        # Attach extensions here
+        if self.extensions:
+            request.addExtension(sreg.SRegRequest(
+                required=['nickname', 'email', 'timezone']))
+            ax_req = ax.FetchRequest()
+            ax_req_name = ax.AttrInfo('http://schema.openid.net/namePerson')
+            ax_req.add(ax_req_name)
+            request.addExtension(ax_req)
+            username = pwd.getpwuid(os.getuid())[0]
+            request.addExtension(teams.TeamsRequest(requested=[username]))
+
+        # Build and send final request
+        trust_root = cherrypy.url()
+        return_to = trust_root + 'finish'
+        if request.shouldSendRedirect():
+            redirect_url = request.redirectURL(
+                trust_root, return_to)
+            raise cherrypy.HTTPRedirect(redirect_url)
+        else:
+            return request.htmlMarkup(
+                trust_root, return_to)
+    index.exposed = True
+
+    def finish(self, **args):
+        oidconsumer = consumer.Consumer(dict(), None)
+        info = oidconsumer.complete(cherrypy.request.params, cherrypy.url())
+        display_identifier = info.getDisplayIdentifier()
+
+        if info.status == consumer.FAILURE and display_identifier:
+            return 'ERROR:Verification of %s failed: %s' % (
+                display_identifier, info.message)
+        elif info.status == consumer.CANCEL:
+            return 'ERROR: Cancelled'
+        elif info.status == consumer.SUCCESS:
+            username = pwd.getpwuid(os.getuid())[0]
+            expected_identifier = 'http://127.0.0.10:45080/idp1/openid/id/%s/'\
+                % username
+            if expected_identifier != display_identifier:
+                return 'ERROR: Wrong id returned: %s != %s' % (
+                    expected_identifier,
+                    display_identifier)
+
+            if self.extensions:
+                sreg_resp = sreg.SRegResponse.fromSuccessResponse(info)
+                teams_resp = teams.TeamsResponse.fromSuccessResponse(info)
+                ax_resp = ax.FetchResponse.fromSuccessResponse(info)
+
+                if sreg_resp is None:
+                    return 'ERROR: No sreg!'
+                elif teams_resp is None:
+                    return 'ERROR: No teams!'
+                elif ax_resp is None:
+                    return 'ERROR: No AX!'
+
+                # Check values
+                expected_name = 'Test User %s' % username
+                expected_email = '%s@example.com' % username
+
+                ax_name = ax_resp.data[
+                    'http://schema.openid.net/namePerson'][0]
+                sreg_email = sreg_resp.data['email']
+
+                if ax_name != expected_name:
+                    return 'ERROR: Wrong name returned: %s != %s' % (
+                        expected_name,
+                        ax_name)
+
+                if sreg_email != expected_email:
+                    return 'ERROR: Wrong email returned: %s != %s' % (
+                        expected_email,
+                        sreg_email)
+
+                if username not in teams_resp.teams:
+                    return 'ERROR: User not in self-named group (%s not in %s)' %\
+                        (username, teams_resp.teams)
+
+            if self.extensions:
+                return 'SUCCESS, WITH EXTENSIONS'
+            else:
+                return 'SUCCESS, WITHOUT EXTENSIONS'
+        else:
+            return 'ERROR: Strange error: %s' % info.message
+    finish.exposed = True
+
+
+cherrypy.config['environment'] = 'embedded'
+
+application = cherrypy.Application(OpenIDApp(),
+                                   script_name=None, config=None)