1 # Copyright (C) 2014 Ipsilon project Contributors, for licensee see COPYING
3 from ipsilon.providers.common import ProviderPageBase
4 from ipsilon.util.trans import Transaction
5 from ipsilon.util.user import UserSession
14 class AuthenticateRequest(ProviderPageBase):
16 def __init__(self, *args, **kwargs):
17 super(AuthenticateRequest, self).__init__(*args, **kwargs)
20 def _preop(self, *args, **kwargs):
22 # generate a new id or get current one
23 self.trans = Transaction('persona', **kwargs)
24 if self.trans.cookie.value != self.trans.provider:
25 self.debug('Invalid transaction, %s != %s' % (
26 self.trans.cookie.value, self.trans.provider))
27 except Exception, e: # pylint: disable=broad-except
28 self.debug('Transaction initialization failed: %s' % repr(e))
29 raise cherrypy.HTTPError(400, 'Invalid transaction id')
31 def pre_GET(self, *args, **kwargs):
32 self._preop(*args, **kwargs)
34 def pre_POST(self, *args, **kwargs):
35 self._preop(*args, **kwargs)
38 class Sign(AuthenticateRequest):
40 def _base64_url_decode(self, inp):
41 inp += '=' * (4 - (len(inp) % 4))
42 return base64.urlsafe_b64decode(inp)
44 def _base64_url_encode(self, inp):
45 return base64.urlsafe_b64encode(inp).replace('=', '')
47 def _persona_sign(self, email, publicKey, certDuration):
48 self.debug('Signing for %s with duration of %s' % (email,
50 header = {'alg': 'RS256'}
51 header = json.dumps(header)
52 header = self._base64_url_encode(header)
55 # Valid from 10 seconds before now to account for clock skew
56 claim['iat'] = 1000 * int(time.time() - 10)
57 # Validity of at most 24 hours
58 claim['exp'] = 1000 * int(time.time() +
59 min(certDuration, 24 * 60 * 60))
61 claim['iss'] = self.cfg.issuer_domain
62 claim['public-key'] = json.loads(publicKey)
63 claim['principal'] = {'email': email}
65 claim = json.dumps(claim)
66 claim = self._base64_url_encode(claim)
68 certificate = '%s.%s' % (header, claim)
69 digest = M2Crypto.EVP.MessageDigest('sha256')
70 digest.update(certificate)
71 signature = self.cfg.key.sign(digest.digest(), 'sha256')
72 signature = self._base64_url_encode(signature)
73 signed_certificate = '%s.%s' % (certificate, signature)
75 return signed_certificate
77 def _willing_to_sign(self, email, username):
78 for domain in self.cfg.allowed_domains:
79 if email == ('%s@%s' % (username, domain)):
83 def POST(self, *args, **kwargs):
84 if 'email' not in kwargs or 'publicKey' not in kwargs \
85 or 'certDuration' not in kwargs or '@' not in kwargs['email']:
86 cherrypy.response.status = 400
87 raise Exception('Invalid request: %s' % kwargs)
93 raise cherrypy.HTTPError(401, 'Not signed in')
95 if not self._willing_to_sign(kwargs['email'], user.name):
96 self.log('Not willing to sign for %s, logged in as %s' % (
97 kwargs['email'], user.name))
98 raise cherrypy.HTTPError(403, 'Incorrect user')
100 return self._persona_sign(kwargs['email'], kwargs['publicKey'],
101 kwargs['certDuration'])
104 class SignInResult(AuthenticateRequest):
105 def GET(self, *args, **kwargs):
106 user = UserSession().get_user()
108 return self._template('persona/signin_result.html',
109 loggedin=not user.is_anonymous)
112 class SignIn(AuthenticateRequest):
113 def __init__(self, *args, **kwargs):
114 super(SignIn, self).__init__(*args, **kwargs)
115 self.result = SignInResult(*args, **kwargs)
118 def GET(self, *args, **kwargs):
121 if 'email' in kwargs:
122 if '@' in kwargs['email']:
123 username, domain = kwargs['email'].split('@', 2)
124 self.debug('Persona SignIn requested for: %s@%s' % (username,
127 returl = '%s/persona/SignIn/result?%s' % (
128 self.basepath, self.trans.get_GET_arg())
129 data = {'login_return': returl,
130 'login_target': 'Persona',
131 'login_username': username}
132 self.trans.store(data)
133 redirect = '%s/login?%s' % (self.basepath,
134 self.trans.get_GET_arg())
135 self.debug('Redirecting: %s' % redirect)
136 raise cherrypy.HTTPRedirect(redirect)
139 class Persona(AuthenticateRequest):
141 def __init__(self, *args, **kwargs):
142 super(Persona, self).__init__(*args, **kwargs)
143 self.Sign = Sign(*args, **kwargs)
144 self.SignIn = SignIn(*args, **kwargs)
147 def GET(self, *args, **kwargs):
148 user = UserSession().get_user()
149 return self._template('persona/provisioning.html',
150 loggedin=not user.is_anonymous)