3 # Copyright (C) 2014 Ipsilon project Contributors, for licensee see COPYING
5 from ipsilon.providers.common import ProviderPageBase
6 from ipsilon.util.trans import Transaction
7 from ipsilon.util.user import UserSession
16 class AuthenticateRequest(ProviderPageBase):
18 def __init__(self, *args, **kwargs):
19 super(AuthenticateRequest, self).__init__(*args, **kwargs)
22 def _preop(self, *args, **kwargs):
24 # generate a new id or get current one
25 self.trans = Transaction('persona', **kwargs)
26 if self.trans.cookie.value != self.trans.provider:
27 self.debug('Invalid transaction, %s != %s' % (
28 self.trans.cookie.value, self.trans.provider))
29 except Exception, e: # pylint: disable=broad-except
30 self.debug('Transaction initialization failed: %s' % repr(e))
31 raise cherrypy.HTTPError(400, 'Invalid transaction id')
33 def pre_GET(self, *args, **kwargs):
34 self._preop(*args, **kwargs)
36 def pre_POST(self, *args, **kwargs):
37 self._preop(*args, **kwargs)
40 class Sign(AuthenticateRequest):
42 def _base64_url_decode(self, inp):
43 inp += '=' * (4 - (len(inp) % 4))
44 return base64.urlsafe_b64decode(inp)
46 def _base64_url_encode(self, inp):
47 return base64.urlsafe_b64encode(inp).replace('=', '')
49 def _persona_sign(self, email, publicKey, certDuration):
50 self.debug('Signing for %s with duration of %s' % (email,
52 header = {'alg': 'RS256'}
53 header = json.dumps(header)
54 header = self._base64_url_encode(header)
57 # Valid from 10 seconds before now to account for clock skew
58 claim['iat'] = 1000 * int(time.time() - 10)
59 # Validity of at most 24 hours
60 claim['exp'] = 1000 * int(time.time() +
61 min(certDuration, 24 * 60 * 60))
63 claim['iss'] = self.cfg.issuer_domain
64 claim['public-key'] = json.loads(publicKey)
65 claim['principal'] = {'email': email}
67 claim = json.dumps(claim)
68 claim = self._base64_url_encode(claim)
70 certificate = '%s.%s' % (header, claim)
71 digest = M2Crypto.EVP.MessageDigest('sha256')
72 digest.update(certificate)
73 signature = self.cfg.key.sign(digest.digest(), 'sha256')
74 signature = self._base64_url_encode(signature)
75 signed_certificate = '%s.%s' % (certificate, signature)
77 return signed_certificate
79 def _willing_to_sign(self, email, username):
80 for domain in self.cfg.allowed_domains:
81 if email == ('%s@%s' % (username, domain)):
85 def POST(self, *args, **kwargs):
86 if 'email' not in kwargs or 'publicKey' not in kwargs \
87 or 'certDuration' not in kwargs or '@' not in kwargs['email']:
88 cherrypy.response.status = 400
89 raise Exception('Invalid request: %s' % kwargs)
95 raise cherrypy.HTTPError(401, 'Not signed in')
97 if not self._willing_to_sign(kwargs['email'], user.name):
98 self.log('Not willing to sign for %s, logged in as %s' % (
99 kwargs['email'], user.name))
100 raise cherrypy.HTTPError(403, 'Incorrect user')
102 return self._persona_sign(kwargs['email'], kwargs['publicKey'],
103 kwargs['certDuration'])
106 class SignInResult(AuthenticateRequest):
107 def GET(self, *args, **kwargs):
108 user = UserSession().get_user()
110 return self._template('persona/signin_result.html',
111 loggedin=not user.is_anonymous)
114 class SignIn(AuthenticateRequest):
115 def __init__(self, *args, **kwargs):
116 super(SignIn, self).__init__(*args, **kwargs)
117 self.result = SignInResult(*args, **kwargs)
120 def GET(self, *args, **kwargs):
123 if 'email' in kwargs:
124 if '@' in kwargs['email']:
125 username, domain = kwargs['email'].split('@', 2)
126 self.debug('Persona SignIn requested for: %s@%s' % (username,
129 returl = '%s/persona/SignIn/result?%s' % (
130 self.basepath, self.trans.get_GET_arg())
131 data = {'login_return': returl,
132 'login_target': 'Persona',
133 'login_username': username}
134 self.trans.store(data)
135 redirect = '%s/login?%s' % (self.basepath,
136 self.trans.get_GET_arg())
137 self.debug('Redirecting: %s' % redirect)
138 raise cherrypy.HTTPRedirect(redirect)
141 class Persona(AuthenticateRequest):
143 def __init__(self, *args, **kwargs):
144 super(Persona, self).__init__(*args, **kwargs)
145 self.Sign = Sign(*args, **kwargs)
146 self.SignIn = SignIn(*args, **kwargs)
149 def GET(self, *args, **kwargs):
150 user = UserSession().get_user()
151 return self._template('persona/provisioning.html',
152 loggedin=not user.is_anonymous)