2 # Copyright (C) 2015 Ipsilon Project Contributors
4 from helpers.common import IpsilonTestBase # pylint: disable=relative-import
5 from helpers.common import WRAP_HOSTNAME # pylint: disable=relative-import
6 from helpers.common import TESTREALM # pylint: disable=relative-import
7 from helpers.http import HttpSessions # pylint: disable=relative-import
8 from ipsilon.tools.saml2metadata import SAML2_NAMEID_MAP
13 from string import Template
16 idp_g = {'TEMPLATES': '${TESTDIR}/templates/install',
17 'CONFDIR': '${TESTDIR}/etc',
18 'DATADIR': '${TESTDIR}/lib',
19 'HTTPDCONFD': '${TESTDIR}/${NAME}/conf.d',
20 'STATICDIR': '${ROOTDIR}',
21 'BINDIR': '${ROOTDIR}/ipsilon',
22 'WSGI_SOCKET_PREFIX': '${TESTDIR}/${NAME}/logs/wsgi'}
25 idp_a = {'hostname': '${ADDRESS}:${PORT}',
26 'admin_user': '${TEST_USER}',
27 'system_user': '${TEST_USER}',
28 'instance': '${NAME}',
33 'gssapi_httpd_keytab': '${TESTDIR}/${HTTP_KTNAME}',
35 'server_debugging': 'True'}
38 sp_g = {'HTTPDCONFD': '${TESTDIR}/${NAME}/conf.d',
39 'SAML2_TEMPLATE': '${TESTDIR}/templates/install/saml2/sp.conf',
40 'SAML2_CONFFILE': '${TESTDIR}/${NAME}/conf.d/ipsilon-saml.conf',
41 'SAML2_HTTPDIR': '${TESTDIR}/${NAME}/saml2'}
44 sp_a = {'hostname': '${ADDRESS}:${PORT}',
45 'saml_idp_metadata': 'http://%s:45080/idp1/saml2/metadata' %
47 'saml_secure_setup': 'False',
49 'saml_nameid': '${NAMEID}',
50 'httpd_user': '${TEST_USER}'}
53 def generate_sp_list():
57 for nameid in SAML2_NAMEID_MAP.keys():
59 spdata = {'nameid': nameid, 'addr': '127.0.0.11', 'port': str(spport)}
66 def get_sp_by_nameid(splist, nameid):
68 if server['nameid'] == nameid:
74 def convert_to_dict(envlist):
76 for pair in envlist.split('\n'):
77 if pair.find('=') > 0:
78 (key, value) = pair.split('=', 1)
83 def fixup_sp_httpd(httpdir):
86 AddOutputFilter INCLUDES .html
88 Alias /sp ${HTTPDIR}/sp
90 <Directory ${HTTPDIR}/sp>
95 index = """<!--#echo var="REMOTE_USER" -->"""
97 t = Template(location)
98 text = t.substitute({'HTTPDIR': httpdir})
99 with open(httpdir + '/conf.d/ipsilon-saml.conf', 'a') as f:
102 os.mkdir(httpdir + '/sp')
103 with open(httpdir + '/sp/index.html', 'w') as f:
107 class IpsilonTest(IpsilonTestBase):
110 super(IpsilonTest, self).__init__('testnameid', __file__)
112 def setup_servers(self, env=None):
113 os.mkdir("%s/ccaches" % self.testdir)
115 print "Installing KDC server"
116 kdcenv = self.setup_kdc(env)
118 print "Creating principals and keytabs"
119 self.setup_keys(kdcenv)
121 print "Getting a TGT"
122 self.kinit_keytab(kdcenv)
124 print "Installing IDP server"
128 idp = self.generate_profile(idp_g, idp_a, name, addr, port)
129 conf = self.setup_idp_server(idp, name, addr, port, env)
131 print "Starting IDP's httpd server"
133 self.start_http_server(conf, env)
135 for spdata in generate_sp_list():
136 nameid = spdata['nameid']
137 addr = spdata['addr']
138 port = spdata['port']
139 print "Installing SP server %s" % nameid
140 sp_prof = self.generate_profile(
141 sp_g, sp_a, nameid, addr, str(port), nameid
143 conf = self.setup_sp_server(sp_prof, nameid, addr, str(port), env)
144 fixup_sp_httpd(os.path.dirname(conf))
146 print "Starting SP's httpd server"
147 self.start_http_server(conf, env)
150 if __name__ == '__main__':
153 user = pwd.getpwuid(os.getuid())[0]
156 'x509': False, # not supported
159 'windows': False, # not supported
160 'encrypted': False, # not supported
164 'entity': False, # not supported
168 'x509': 'Unauthorized', # not supported
169 'transient': '_[0-9a-f]{32}',
170 'persistent': '_[0-9a-f]{128}',
171 'windows': 'Unauthorized', # not supported
172 'encrypted': 'Unauthorized', # not supported
173 'kerberos': '%s@%s' % (user, TESTREALM),
174 'email': '%s@.*' % user,
176 'entity': 'Unauthorized', # not supported
179 testdir = os.environ['TESTDIR']
181 krb5conf = os.path.join(testdir, 'krb5.conf')
182 kenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin',
183 'KRB5_CONFIG': krb5conf,
184 'KRB5CCNAME': 'FILE:' + os.path.join(testdir, 'ccaches/user')}
187 os.environ[kkey] = kenv[kkey]
189 sp_list = generate_sp_list()
192 spname = sp['nameid']
193 spurl = 'http://%s:%s' % (sp['addr'], sp['port'])
194 sess = HttpSessions()
195 sess.add_server(idpname, 'http://%s:45080' % WRAP_HOSTNAME, user,
197 sess.add_server(spname, spurl)
200 print "testnameid: Testing NameID format %s ..." % spname
202 if spname == 'kerberos':
205 print "testnameid: Authenticate to IDP ...",
207 sess.auth_to_idp(idpname, krb=krb)
208 except Exception, e: # pylint: disable=broad-except
209 print >> sys.stderr, " ERROR: %s" % repr(e)
213 print "testnameid: Add SP Metadata to IDP ...",
215 sess.add_sp_metadata(idpname, spname)
216 except Exception, e: # pylint: disable=broad-except
217 print >> sys.stderr, " ERROR: %s" % repr(e)
221 print "testnameid: Set supported Name ID formats ...",
223 sess.set_sp_default_nameids(idpname, spname, [spname])
224 except Exception, e: # pylint: disable=broad-except
225 print >> sys.stderr, " ERROR: %s" % repr(e)
229 print "testnameid: Access SP Protected Area ...",
231 page = sess.fetch_page(idpname, '%s/sp/' % spurl)
232 if not re.match(expected_re[spname], page.text):
234 'page did not contain expression %s' %
237 except ValueError, e:
239 print >> sys.stderr, " ERROR: %s" % repr(e)
241 print " OK, EXPECTED TO FAIL"
245 print "testnameid: Try authentication failure ...",
246 newsess = HttpSessions()
247 newsess.add_server(idpname, 'http://%s:45080' % WRAP_HOSTNAME,
250 newsess.auth_to_idp(idpname)
251 print >> sys.stderr, " ERROR: Authentication should have failed"
253 except Exception, e: # pylint: disable=broad-except
256 # Ensure that transient names change with each authentication
257 sp = get_sp_by_nameid(sp_list, 'transient')
258 spname = sp['nameid']
259 spurl = 'http://%s:%s' % (sp['addr'], sp['port'])
262 print "testnameid: Testing NameID format %s ..." % spname
266 sess = HttpSessions()
267 sess.add_server(idpname, 'http://%s:45080' % WRAP_HOSTNAME,
269 sess.add_server(spname, spurl)
270 print "testnameid: Authenticate to IDP ...",
272 sess.auth_to_idp(idpname)
273 except Exception, e: # pylint: disable=broad-except
274 print >> sys.stderr, " ERROR: %s" % repr(e)
279 print "testnameid: Access SP ...",
281 page = sess.fetch_page(idpname, '%s/sp/' % spurl)
283 except ValueError, e:
284 print >> sys.stderr, " ERROR: %s" % repr(e)
289 print "testnameid: Access SP again ...",
291 page = sess.fetch_page(idpname, '%s/sp/' % spurl)
293 except ValueError, e:
294 print >> sys.stderr, " ERROR: %s" % repr(e)
299 print "testnameid: Ensure ID is consistent between requests ...",
301 print >> sys.stderr, " ERROR: New ID between reqeusts"
307 print "testnameid: Ensure uniqueness across sessions ...",
308 if len(ids) != len(set(ids)):
309 print >> sys.stderr, " ERROR: IDs are not unique between sessions"
314 # Ensure that persistent names remain the same with each authentication
315 sp = get_sp_by_nameid(sp_list, 'persistent')
316 spname = sp['nameid']
317 spurl = 'http://%s:%s' % (sp['addr'], sp['port'])
320 print "testnameid: Testing NameID format %s ..." % spname
324 sess = HttpSessions()
325 sess.add_server(idpname, 'http://%s:45080' % WRAP_HOSTNAME,
327 sess.add_server(spname, spurl)
328 print "testnameid: Authenticate to IDP ...",
330 sess.auth_to_idp(idpname)
331 except Exception, e: # pylint: disable=broad-except
332 print >> sys.stderr, " ERROR: %s" % repr(e)
337 print "testnameid: Access SP ...",
339 page = sess.fetch_page(idpname, '%s/sp/' % spurl)
341 except ValueError, e:
342 print >> sys.stderr, " ERROR: %s" % repr(e)
347 print "testnameid: Access SP again ...",
349 page = sess.fetch_page(idpname, '%s/sp/' % spurl)
351 except ValueError, e:
352 print >> sys.stderr, " ERROR: %s" % repr(e)
357 print "testnameid: Ensure ID is consistent between requests ...",
359 print >> sys.stderr, " ERROR: New ID between reqeusts"
365 print "testnameid: Ensure same ID across sessions ...",
366 if len(set(ids)) != 1:
367 print >> sys.stderr, " ERROR: IDs are not the same between sessions"