3 # Copyright (C) 2014 Simo Sorce <simo@redhat.com>
5 # see file 'COPYING' for use and warranty information
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
20 from ipsilon.providers.common import ProviderException
21 from ipsilon.tools.saml2metadata import SAML2_NAMEID_MAP
22 from ipsilon.util.log import Log
26 class InvalidProviderId(ProviderException):
28 def __init__(self, code):
29 message = 'Invalid Provider ID: %s' % code
30 super(InvalidProviderId, self).__init__(message)
34 class NameIdNotAllowed(Exception):
36 def __init__(self, nid):
37 message = 'Name ID [%s] is not allowed' % nid
38 super(NameIdNotAllowed, self).__init__(message)
39 self.message = message
42 return repr(self.message)
45 class ServiceProvider(Log):
47 def __init__(self, config, provider_id):
49 data = self.cfg.get_data(name='id', value=provider_id)
51 raise InvalidProviderId('multiple matches')
52 idval = data.keys()[0]
53 data = self.cfg.get_data(idval=idval)
54 self._properties = data[idval]
55 self._staging = dict()
58 def provider_id(self):
59 return self._properties['id']
63 return self._properties['name']
66 def name(self, value):
67 self._staging['name'] = value
71 if 'owner' in self._properties:
72 return self._properties['owner']
77 def owner(self, value):
78 self._staging['owner'] = value
81 def allowed_nameids(self):
82 if 'allowed nameids' in self._properties:
83 allowed = self._properties['allowed nameids']
84 return [x.strip() for x in allowed.split(',')]
86 return self.cfg.default_allowed_nameids
88 @allowed_nameids.setter
89 def allowed_nameids(self, value):
90 if type(value) is not list:
91 raise ValueError("Must be a list")
92 self._staging['allowed nameids'] = ','.join(value)
95 def default_nameid(self):
96 if 'default nameid' in self._properties:
97 return self._properties['default nameid']
99 return self.cfg.default_nameid
101 @default_nameid.setter
102 def default_nameid(self, value):
103 self._staging['default nameid'] = value
105 def save_properties(self):
106 data = self.cfg.get_data(name='id', value=self.provider_id)
108 raise InvalidProviderId('Could not find SP data')
109 idval = data.keys()[0]
111 data[idval] = self._staging
112 self.cfg.save_data(data)
113 data = self.cfg.get_data(idval=idval)
114 self._properties = data[idval]
115 self._staging = dict()
117 def get_valid_nameid(self, nip):
118 self._debug('Requested NameId [%s]' % (nip.format,))
119 if nip.format is None:
120 return SAML2_NAMEID_MAP[self.default_nameid]
121 elif nip.format == lasso.SAML2_NAME_IDENTIFIER_FORMAT_UNSPECIFIED:
122 return SAML2_NAMEID_MAP[self.default_nameid]
124 allowed = self.allowed_nameids
125 self._debug('Allowed NameIds %s' % (repr(allowed)))
126 for nameid in allowed:
127 if nip.format == SAML2_NAMEID_MAP[nameid]:
129 raise NameIdNotAllowed(nip.format)
131 def permanently_delete(self):
132 data = self.cfg.get_data(name='id', value=self.provider_id)
134 raise InvalidProviderId('Could not find SP data')
135 idval = data.keys()[0]
136 self.cfg.del_datum(idval)
138 def normalize_username(self, username):
139 if 'strip domain' in self._properties:
140 return username.split('@', 1)[0]
143 def is_valid_nameid(self, value):
144 if value in SAML2_NAMEID_MAP:
148 def valid_nameids(self):
149 return SAML2_NAMEID_MAP.keys()
152 class ServiceProviderCreator(object):
154 def __init__(self, config):
157 def create_from_buffer(self, name, metabuf):
158 '''Test and add data'''
160 test = lasso.Server()
161 test.addProviderFromBuffer(lasso.PROVIDER_ROLE_SP, metabuf)
162 newsps = test.get_providers()
164 raise InvalidProviderId("Metadata must contain one Provider")
166 spid = newsps.keys()[0]
167 data = self.cfg.get_data(name='id', value=spid)
169 raise InvalidProviderId("Provider Already Exists")
170 datum = {'id': spid, 'name': name, 'type': 'SP', 'metadata': metabuf}
171 self.cfg.new_datum(datum)
173 data = self.cfg.get_data(name='id', value=spid)
175 raise InvalidProviderId("Internal Error")
176 idval = data.keys()[0]
177 data = self.cfg.get_data(idval=idval)
179 self.cfg.idp.add_provider(sp)
181 return ServiceProvider(self.cfg, spid)
184 class IdentityProvider(Log):
185 def __init__(self, config):
186 self.server = lasso.Server(config.idp_metadata_file,
189 config.idp_certificate_file)
190 self.server.role = lasso.PROVIDER_ROLE_IDP
192 def add_provider(self, sp):
193 self.server.addProviderFromBuffer(lasso.PROVIDER_ROLE_SP,
195 self._debug('Added SP %s' % sp['name'])
197 def get_login_handler(self, dump=None):
199 return lasso.Login.newFromDump(self.server, dump)
201 return lasso.Login(self.server)
203 def get_providers(self):
204 return self.server.get_providers()