pam: use a pam object method instead of pam module function
[cascardo/ipsilon.git] / ipsilon / providers / persona / auth.py
1 # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING
2
3 from ipsilon.providers.common import ProviderPageBase
4 from ipsilon.util.user import UserSession
5 from ipsilon.util.endpoint import allow_iframe
6
7 import base64
8 import cherrypy
9 import time
10 import json
11 import M2Crypto
12
13
14 class AuthenticateRequest(ProviderPageBase):
15
16     def __init__(self, site, provider, *args, **kwargs):
17         super(AuthenticateRequest, self).__init__(site, provider)
18         self.trans = None
19
20     def _preop(self, *args, **kwargs):
21         self.trans = self.get_valid_transaction('persona', **kwargs)
22
23     def pre_GET(self, *args, **kwargs):
24         self._preop(*args, **kwargs)
25
26     def pre_POST(self, *args, **kwargs):
27         self._preop(*args, **kwargs)
28
29
30 class Sign(AuthenticateRequest):
31
32     def _base64_url_decode(self, inp):
33         inp += '=' * (4 - (len(inp) % 4))
34         return base64.urlsafe_b64decode(inp)
35
36     def _base64_url_encode(self, inp):
37         return base64.urlsafe_b64encode(inp).replace('=', '')
38
39     def _persona_sign(self, email, publicKey, certDuration):
40         self.debug('Signing for %s with duration of %s' % (email,
41                                                            certDuration))
42         header = {'alg': 'RS256'}
43         header = json.dumps(header)
44         header = self._base64_url_encode(header)
45
46         claim = {}
47         # Valid from 10 seconds before now to account for clock skew
48         claim['iat'] = 1000 * int(time.time() - 10)
49         # Validity of at most 24 hours
50         claim['exp'] = 1000 * int(time.time() +
51                                   min(certDuration, 24 * 60 * 60))
52
53         claim['iss'] = self.cfg.issuer_domain
54         claim['public-key'] = json.loads(publicKey)
55         claim['principal'] = {'email': email}
56
57         claim = json.dumps(claim)
58         claim = self._base64_url_encode(claim)
59
60         certificate = '%s.%s' % (header, claim)
61         digest = M2Crypto.EVP.MessageDigest('sha256')
62         digest.update(certificate)
63         signature = self.cfg.key.sign(digest.digest(), 'sha256')
64         signature = self._base64_url_encode(signature)
65         signed_certificate = '%s.%s' % (certificate, signature)
66
67         return signed_certificate
68
69     def _willing_to_sign(self, email, username):
70         for domain in self.cfg.allowed_domains:
71             if email == ('%s@%s' % (username, domain)):
72                 return True
73         return False
74
75     @allow_iframe
76     def POST(self, *args, **kwargs):
77         if 'email' not in kwargs or 'publicKey' not in kwargs \
78                 or 'certDuration' not in kwargs or '@' not in kwargs['email']:
79             cherrypy.response.status = 400
80             raise Exception('Invalid request: %s' % kwargs)
81
82         us = UserSession()
83         user = us.get_user()
84
85         if user.is_anonymous:
86             raise cherrypy.HTTPError(401, 'Not signed in')
87
88         if not self._willing_to_sign(kwargs['email'], user.name):
89             self.log('Not willing to sign for %s, logged in as %s' % (
90                 kwargs['email'], user.name))
91             raise cherrypy.HTTPError(403, 'Incorrect user')
92
93         return self._persona_sign(kwargs['email'], kwargs['publicKey'],
94                                   kwargs['certDuration'])
95
96
97 class SignInResult(AuthenticateRequest):
98     @allow_iframe
99     def GET(self, *args, **kwargs):
100         user = UserSession().get_user()
101
102         return self._template('persona/signin_result.html',
103                               loggedin=not user.is_anonymous)
104
105
106 class SignIn(AuthenticateRequest):
107     def __init__(self, *args, **kwargs):
108         super(SignIn, self).__init__(*args, **kwargs)
109         self.result = SignInResult(*args, **kwargs)
110         self.trans = None
111
112     @allow_iframe
113     def GET(self, *args, **kwargs):
114         username = None
115         domain = None
116         if 'email' in kwargs:
117             if '@' in kwargs['email']:
118                 username, domain = kwargs['email'].split('@', 2)
119                 self.debug('Persona SignIn requested for: %s@%s' % (username,
120                                                                     domain))
121
122         returl = '%s/persona/SignIn/result?%s' % (
123             self.basepath, self.trans.get_GET_arg())
124         data = {'login_return': returl,
125                 'login_target': 'Persona',
126                 'login_username': username}
127         self.trans.store(data)
128         redirect = '%s/login?%s' % (self.basepath,
129                                     self.trans.get_GET_arg())
130         self.debug('Redirecting: %s' % redirect)
131         raise cherrypy.HTTPRedirect(redirect)
132
133
134 class Persona(AuthenticateRequest):
135
136     def __init__(self, *args, **kwargs):
137         super(Persona, self).__init__(*args, **kwargs)
138         self.Sign = Sign(*args, **kwargs)
139         self.SignIn = SignIn(*args, **kwargs)
140         self.trans = None
141
142     @allow_iframe
143     def GET(self, *args, **kwargs):
144         user = UserSession().get_user()
145         return self._template('persona/provisioning.html',
146                               loggedin=not user.is_anonymous)