1 # Copyright (C) 2014 Simo Sorce <simo@redhat.com>
3 # see file 'COPYING' for use and warranty information
5 # This program is free software; you can redistribute it and/or modify
6 # it under the terms of the GNU General Public License as published by
7 # the Free Software Foundation, either version 3 of the License, or
8 # (at your option) any later version.
10 # This program is distributed in the hope that it will be useful,
11 # but WITHOUT ANY WARRANTY; without even the implied warranty of
12 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 # GNU General Public License for more details.
15 # You should have received a copy of the GNU General Public License
16 # along with this program. If not, see <http://www.gnu.org/licenses/>.
18 from ipsilon.providers.common import ProviderException
19 from ipsilon.util import config as pconfig
20 from ipsilon.util.config import ConfigHelper
21 from ipsilon.tools.saml2metadata import SAML2_NAMEID_MAP
22 from ipsilon.util.log import Log
27 VALID_IN_NAME = r'[^\ a-zA-Z0-9]'
30 class InvalidProviderId(ProviderException):
32 def __init__(self, code):
33 message = 'Invalid Provider ID: %s' % code
34 super(InvalidProviderId, self).__init__(message)
38 class NameIdNotAllowed(Exception):
40 def __init__(self, nid):
41 message = 'Name ID [%s] is not allowed' % nid
42 super(NameIdNotAllowed, self).__init__(message)
43 self.message = message
46 return repr(self.message)
49 class ServiceProviderConfig(ConfigHelper):
51 super(ServiceProviderConfig, self).__init__()
54 class ServiceProvider(ServiceProviderConfig):
56 def __init__(self, config, provider_id):
57 super(ServiceProvider, self).__init__()
59 data = self.cfg.get_data(name='id', value=provider_id)
61 raise InvalidProviderId('multiple matches')
62 idval = data.keys()[0]
63 data = self.cfg.get_data(idval=idval)
64 self._properties = data[idval]
65 self._staging = dict()
68 def load_config(self):
73 'A nickname used to easily identify the Service Provider.'
74 ' Only alphanumeric characters [A-Z,a-z,0-9] and spaces are'
79 'Default NameID used by Service Providers.',
80 SAML2_NAMEID_MAP.keys(),
84 'Allowed NameIDs for this Service Provider.',
85 SAML2_NAMEID_MAP.keys(),
86 self.allowed_nameids),
89 'The user that owns this Service Provider',
93 'Defines how to map attributes before returning them to'
94 ' the SP. Setting this overrides the global values.',
95 self.attribute_mappings),
98 'Defines a list of allowed attributes, applied after mapping.'
99 ' Setting this overrides the global values.',
100 self.allowed_attributes),
104 def provider_id(self):
105 return self._properties['id']
109 return self._properties['name']
112 def name(self, value):
113 self._staging['name'] = value
117 if 'owner' in self._properties:
118 return self._properties['owner']
123 def owner(self, value):
124 self._staging['owner'] = value
127 def allowed_nameids(self):
128 if 'allowed nameids' in self._properties:
129 allowed = self._properties['allowed nameids']
130 return [x.strip() for x in allowed.split(',')]
132 return self.cfg.default_allowed_nameids
134 @allowed_nameids.setter
135 def allowed_nameids(self, value):
136 if type(value) is not list:
137 raise ValueError("Must be a list")
138 self._staging['allowed nameids'] = ','.join(value)
141 def default_nameid(self):
142 if 'default nameid' in self._properties:
143 return self._properties['default nameid']
145 return self.cfg.default_nameid
147 @default_nameid.setter
148 def default_nameid(self, value):
149 self._staging['default nameid'] = value
152 def attribute_mappings(self):
153 if 'attribute mappings' in self._properties:
154 attr_map = pconfig.MappingList('temp', 'temp', None)
155 attr_map.import_value(str(self._properties['attribute mappings']))
156 return attr_map.get_value()
160 @attribute_mappings.setter
161 def attribute_mappings(self, attr_map):
162 if isinstance(attr_map, pconfig.MappingList):
163 value = attr_map.export_value()
165 temp = pconfig.MappingList('temp', 'temp', None)
166 temp.set_value(attr_map)
167 value = temp.export_value()
168 self._staging['attribute mappings'] = value
171 def allowed_attributes(self):
172 if 'allowed_attributes' in self._properties:
173 attr_map = pconfig.ComplexList('temp', 'temp', None)
174 attr_map.import_value(str(self._properties['allowed_attributes']))
175 return attr_map.get_value()
179 @allowed_attributes.setter
180 def allowed_attributes(self, attr_map):
181 if isinstance(attr_map, pconfig.ComplexList):
182 value = attr_map.export_value()
184 temp = pconfig.ComplexList('temp', 'temp', None)
185 temp.set_value(attr_map)
186 value = temp.export_value()
187 self._staging['allowed_attributes'] = value
189 def save_properties(self):
190 data = self.cfg.get_data(name='id', value=self.provider_id)
192 raise InvalidProviderId('Could not find SP data')
193 idval = data.keys()[0]
195 data[idval] = self._staging
196 self.cfg.save_data(data)
197 data = self.cfg.get_data(idval=idval)
198 self._properties = data[idval]
199 self._staging = dict()
201 def refresh_config(self):
203 Create a new config object for displaying in the UI based on
204 the current set of properties.
209 def get_valid_nameid(self, nip):
210 self._debug('Requested NameId [%s]' % (nip.format,))
211 if nip.format is None:
212 return SAML2_NAMEID_MAP[self.default_nameid]
214 allowed = self.allowed_nameids
215 self._debug('Allowed NameIds %s' % (repr(allowed)))
216 for nameid in allowed:
217 if nip.format == SAML2_NAMEID_MAP[nameid]:
219 raise NameIdNotAllowed(nip.format)
221 def permanently_delete(self):
222 data = self.cfg.get_data(name='id', value=self.provider_id)
224 raise InvalidProviderId('Could not find SP data')
225 idval = data.keys()[0]
226 self.cfg.del_datum(idval)
228 def normalize_username(self, username):
229 if 'strip domain' in self._properties:
230 return username.split('@', 1)[0]
233 def is_valid_name(self, value):
234 if re.search(VALID_IN_NAME, value):
238 def is_valid_nameid(self, value):
239 if value in SAML2_NAMEID_MAP:
243 def valid_nameids(self):
244 return SAML2_NAMEID_MAP.keys()
247 class ServiceProviderCreator(object):
249 def __init__(self, config):
252 def create_from_buffer(self, name, metabuf):
253 '''Test and add data'''
255 if re.search(VALID_IN_NAME, name):
256 raise InvalidProviderId("Name must contain only "
257 "numbers and letters")
259 test = lasso.Server()
260 test.addProviderFromBuffer(lasso.PROVIDER_ROLE_SP, metabuf)
261 newsps = test.get_providers()
263 raise InvalidProviderId("Metadata must contain one Provider")
265 spid = newsps.keys()[0]
266 data = self.cfg.get_data(name='id', value=spid)
268 raise InvalidProviderId("Provider Already Exists")
269 datum = {'id': spid, 'name': name, 'type': 'SP', 'metadata': metabuf}
270 self.cfg.new_datum(datum)
272 data = self.cfg.get_data(name='id', value=spid)
274 raise InvalidProviderId("Internal Error")
275 idval = data.keys()[0]
276 data = self.cfg.get_data(idval=idval)
278 self.cfg.idp.add_provider(sp)
280 return ServiceProvider(self.cfg, spid)
283 class IdentityProvider(Log):
284 def __init__(self, config):
285 self.server = lasso.Server(config.idp_metadata_file,
288 config.idp_certificate_file)
289 self.server.role = lasso.PROVIDER_ROLE_IDP
291 def add_provider(self, sp):
292 self.server.addProviderFromBuffer(lasso.PROVIDER_ROLE_SP,
294 self._debug('Added SP %s' % sp['name'])
296 def get_login_handler(self, dump=None):
298 return lasso.Login.newFromDump(self.server, dump)
300 return lasso.Login(self.server)
302 def get_providers(self):
303 return self.server.get_providers()
305 def get_logout_handler(self, dump=None):
307 return lasso.Logout.newFromDump(self.server, dump)
309 return lasso.Logout(self.server)