# Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING
+from cherrypy import config as cherrypy_config
from ipsilon.util.log import Log
from ipsilon.util.data import SAML2SessionStore
+import datetime
+from lasso import (
+ SAML2_METADATA_BINDING_SOAP,
+ SAML2_METADATA_BINDING_REDIRECT,
+)
LOGGED_IN = 1
INIT_LOGOUT = 2
logout response will include an InResponseTo value
which matches this.
logout_request - the Logout request object
+ expiration_time - the time the login session expires
+ supported_logout_mechs - logout mechanisms supported by this session
"""
def __init__(self, uuidval, session_id, provider_id, user,
login_session, logoutstate=None, relaystate=None,
- logout_request=None, request_id=None):
+ logout_request=None, request_id=None,
+ expiration_time=None,
+ supported_logout_mechs=None):
self.uuidval = uuidval
self.session_id = session_id
self.relaystate = relaystate
self.request_id = request_id
self.logout_request = logout_request
+ self.expiration_time = expiration_time
+ if supported_logout_mechs is None:
+ supported_logout_mechs = []
+ self.supported_logout_mechs = supported_logout_mechs
def set_logoutstate(self, relaystate=None, request=None, request_id=None):
"""
self.debug('provider_id %s' % self.provider_id)
self.debug('login session %s' % self.login_session)
self.debug('logoutstate %s' % self.logoutstate)
+ self.debug('logout mech %s' % self.supported_logout_mechs)
def convert(self):
"""
data['relaystate'] = self.relaystate
data['logout_request'] = self.logout_request
data['request_id'] = self.request_id
+ data['expiration_time'] = self.expiration_time
return {self.uuidval: data}
Returns a SAMLSession object representing the new session.
"""
- def __init__(self):
- self._ss = SAML2SessionStore()
+ def __init__(self, database_url):
+ self._ss = SAML2SessionStore(database_url=database_url)
self.user = None
def _data_to_samlsession(self, uuidval, data):
data.get('logoutstate'),
data.get('relaystate'),
data.get('logout_request'),
- data.get('request_id'))
+ data.get('request_id'),
+ data.get('expiration_time'),
+ data.get('supported_logout_mechs'))
def add_session(self, session_id, provider_id, user, login_session,
- request_id=None):
+ request_id, supported_logout_mechs):
"""
Add a new login session to the table.
+
+ :param session_id: The login session ID
+ :param provider_id: The URL of the SP
+ :param user: The NameID username
+ :param login_session: The lasso Login session
+ :param request_id: The request ID of the Logout
+ :param supported_logout_mechs: A list of logout protocols supported
"""
self.user = user
+ timeout = cherrypy_config['tools.sessions.timeout']
+ t = datetime.timedelta(seconds=timeout * 60)
+ expiration_time = datetime.datetime.now() + t
+
data = {'session_id': session_id,
'provider_id': provider_id,
'user': user,
'login_session': login_session,
- 'logoutstate': LOGGED_IN}
- if request_id:
- data['request_id'] = request_id
+ 'logoutstate': LOGGED_IN,
+ 'expiration_time': expiration_time,
+ 'request_id': request_id,
+ 'supported_logout_mechs': supported_logout_mechs}
uuidval = self._ss.new_session(data)
return SAMLSession(uuidval, session_id, provider_id, user,
login_session, LOGGED_IN,
- request_id=request_id)
+ request_id=request_id,
+ expiration_time=expiration_time)
def get_session_by_id(self, session_id):
"""
datum = samlsession.convert()
self._ss.update_session(datum)
- def get_next_logout(self, peek=False):
+ def get_next_logout(self, peek=False,
+ logout_mechs=None):
"""
Get the next session in the logged-in state and move
it to the logging_out state. Return the session that is
:param peek: for IdP-initiated logout we can't remove the
session otherwise when the request comes back
in the user won't be seen as being logged-on.
-
- Return None if no more sessions in LOGGED_IN state.
+ :param logout_mechs: An ordered list of logout mechanisms
+ you're looking for. For each mechanism in order
+ loop through all sessions. If If no sessions of
+ this method are available then try the next mechanism
+ until exhausted. In that case None is returned.
+
+ Returns a tuple of (mechanism, session) or
+ (None, None) if no more sessions in LOGGED_IN state.
"""
candidates = self._ss.get_user_sessions(self.user)
-
- for c in candidates:
- key = c.keys()[0]
- if int(c[key].get('logoutstate', 0)) == LOGGED_IN:
- samlsession = self._data_to_samlsession(key, c[key])
- self.start_logout(samlsession, initial=False)
- return samlsession
- return None
+ if logout_mechs is None:
+ logout_mechs = [SAML2_METADATA_BINDING_REDIRECT, ]
+
+ for mech in logout_mechs:
+ for c in candidates:
+ key = c.keys()[0]
+ if ((int(c[key].get('logoutstate', 0)) == LOGGED_IN) and
+ (mech in c[key].get('supported_logout_mechs'))):
+ samlsession = self._data_to_samlsession(key, c[key])
+ self.start_logout(samlsession, initial=False)
+ return (mech, samlsession)
+ return (None, None)
def get_initial_logout(self):
"""
Get the initial logout request.
- Return None if no sessions in INIT_LOGOUT state.
+ Raises ValueError if no sessions in INIT_LOGOUT state.
"""
candidates = self._ss.get_user_sessions(self.user)
if int(c[key].get('logoutstate', 0)) == INIT_LOGOUT:
samlsession = self._data_to_samlsession(key, c[key])
return samlsession
- return None
+ raise ValueError()
def wipe_data(self):
self._ss.wipe_data()
count += 1
if __name__ == '__main__':
- import cherrypy
-
provider1 = "http://127.0.0.10/saml2"
provider2 = "http://127.0.0.11/saml2"
- # temporary database location for testing
- cherrypy.config['saml2.sessions.db'] = '/tmp/saml2sessions.sqlite'
+ # temporary values to simulate cherrypy
+ cherrypy_config['tools.sessions.timeout'] = 60
- factory = SAMLSessionFactory()
+ factory = SAMLSessionFactory('/tmp/saml2sessions.sqlite')
factory.wipe_data()
- sess1 = factory.add_session('_123456', provider1, "admin", "<Login/>")
- sess2 = factory.add_session('_789012', provider2, "testuser", "<Login/>")
+ sess1 = factory.add_session('_123456', provider1, "admin",
+ "<Login/>", '_1234',
+ [SAML2_METADATA_BINDING_REDIRECT])
+ sess2 = factory.add_session('_789012', provider2, "testuser",
+ "<Login/>", '_7890',
+ [SAML2_METADATA_BINDING_SOAP,
+ SAML2_METADATA_BINDING_REDIRECT])
# Test finding sessions by provider
ids = factory.get_session_id_by_provider_id(provider2)
assert(len(ids) == 1)
- sess3 = factory.add_session('_345678', provider2, "testuser", "<Login/>")
+ sess3 = factory.add_session('_345678', provider2, "testuser",
+ "<Login/>", '_3456',
+ [SAML2_METADATA_BINDING_REDIRECT])
ids = factory.get_session_id_by_provider_id(provider2)
assert(len(ids) == 2)
test2 = factory.get_session_by_id('_789012')
factory.start_logout(test2, initial=True)
- test3 = factory.get_next_logout()
+ (lmech, test3) = factory.get_next_logout()
assert(test3.session_id == '_345678')
test4 = factory.get_initial_logout()