Fix file permissions and remove shebang's
[cascardo/ipsilon.git] / ipsilon / providers / persona / auth.py
1 # Copyright (C) 2014  Ipsilon project Contributors, for licensee see COPYING
2
3 from ipsilon.providers.common import ProviderPageBase
4 from ipsilon.util.trans import Transaction
5 from ipsilon.util.user import UserSession
6
7 import base64
8 import cherrypy
9 import time
10 import json
11 import M2Crypto
12
13
14 class AuthenticateRequest(ProviderPageBase):
15
16     def __init__(self, *args, **kwargs):
17         super(AuthenticateRequest, self).__init__(*args, **kwargs)
18         self.trans = None
19
20     def _preop(self, *args, **kwargs):
21         try:
22             # generate a new id or get current one
23             self.trans = Transaction('persona', **kwargs)
24             if self.trans.cookie.value != self.trans.provider:
25                 self.debug('Invalid transaction, %s != %s' % (
26                            self.trans.cookie.value, self.trans.provider))
27         except Exception, e:  # pylint: disable=broad-except
28             self.debug('Transaction initialization failed: %s' % repr(e))
29             raise cherrypy.HTTPError(400, 'Invalid transaction id')
30
31     def pre_GET(self, *args, **kwargs):
32         self._preop(*args, **kwargs)
33
34     def pre_POST(self, *args, **kwargs):
35         self._preop(*args, **kwargs)
36
37
38 class Sign(AuthenticateRequest):
39
40     def _base64_url_decode(self, inp):
41         inp += '=' * (4 - (len(inp) % 4))
42         return base64.urlsafe_b64decode(inp)
43
44     def _base64_url_encode(self, inp):
45         return base64.urlsafe_b64encode(inp).replace('=', '')
46
47     def _persona_sign(self, email, publicKey, certDuration):
48         self.debug('Signing for %s with duration of %s' % (email,
49                                                            certDuration))
50         header = {'alg': 'RS256'}
51         header = json.dumps(header)
52         header = self._base64_url_encode(header)
53
54         claim = {}
55         # Valid from 10 seconds before now to account for clock skew
56         claim['iat'] = 1000 * int(time.time() - 10)
57         # Validity of at most 24 hours
58         claim['exp'] = 1000 * int(time.time() +
59                                   min(certDuration, 24 * 60 * 60))
60
61         claim['iss'] = self.cfg.issuer_domain
62         claim['public-key'] = json.loads(publicKey)
63         claim['principal'] = {'email': email}
64
65         claim = json.dumps(claim)
66         claim = self._base64_url_encode(claim)
67
68         certificate = '%s.%s' % (header, claim)
69         digest = M2Crypto.EVP.MessageDigest('sha256')
70         digest.update(certificate)
71         signature = self.cfg.key.sign(digest.digest(), 'sha256')
72         signature = self._base64_url_encode(signature)
73         signed_certificate = '%s.%s' % (certificate, signature)
74
75         return signed_certificate
76
77     def _willing_to_sign(self, email, username):
78         for domain in self.cfg.allowed_domains:
79             if email == ('%s@%s' % (username, domain)):
80                 return True
81         return False
82
83     def POST(self, *args, **kwargs):
84         if 'email' not in kwargs or 'publicKey' not in kwargs \
85                 or 'certDuration' not in kwargs or '@' not in kwargs['email']:
86             cherrypy.response.status = 400
87             raise Exception('Invalid request: %s' % kwargs)
88
89         us = UserSession()
90         user = us.get_user()
91
92         if user.is_anonymous:
93             raise cherrypy.HTTPError(401, 'Not signed in')
94
95         if not self._willing_to_sign(kwargs['email'], user.name):
96             self.log('Not willing to sign for %s, logged in as %s' % (
97                 kwargs['email'], user.name))
98             raise cherrypy.HTTPError(403, 'Incorrect user')
99
100         return self._persona_sign(kwargs['email'], kwargs['publicKey'],
101                                   kwargs['certDuration'])
102
103
104 class SignInResult(AuthenticateRequest):
105     def GET(self, *args, **kwargs):
106         user = UserSession().get_user()
107
108         return self._template('persona/signin_result.html',
109                               loggedin=not user.is_anonymous)
110
111
112 class SignIn(AuthenticateRequest):
113     def __init__(self, *args, **kwargs):
114         super(SignIn, self).__init__(*args, **kwargs)
115         self.result = SignInResult(*args, **kwargs)
116         self.trans = None
117
118     def GET(self, *args, **kwargs):
119         username = None
120         domain = None
121         if 'email' in kwargs:
122             if '@' in kwargs['email']:
123                 username, domain = kwargs['email'].split('@', 2)
124                 self.debug('Persona SignIn requested for: %s@%s' % (username,
125                                                                     domain))
126
127         returl = '%s/persona/SignIn/result?%s' % (
128             self.basepath, self.trans.get_GET_arg())
129         data = {'login_return': returl,
130                 'login_target': 'Persona',
131                 'login_username': username}
132         self.trans.store(data)
133         redirect = '%s/login?%s' % (self.basepath,
134                                     self.trans.get_GET_arg())
135         self.debug('Redirecting: %s' % redirect)
136         raise cherrypy.HTTPRedirect(redirect)
137
138
139 class Persona(AuthenticateRequest):
140
141     def __init__(self, *args, **kwargs):
142         super(Persona, self).__init__(*args, **kwargs)
143         self.Sign = Sign(*args, **kwargs)
144         self.SignIn = SignIn(*args, **kwargs)
145         self.trans = None
146
147     def GET(self, *args, **kwargs):
148         user = UserSession().get_user()
149         return self._template('persona/provisioning.html',
150                               loggedin=not user.is_anonymous)