c02053a9f7d69cef9633b5b69e60db40c6c67ece
[cascardo/ipsilon.git] / tests / testnameid.py
1 #!/usr/bin/python
2 # Copyright (C) 2015 Ipsilon Project Contributors
3
4 from helpers.common import IpsilonTestBase  # pylint: disable=relative-import
5 from helpers.common import WRAP_HOSTNAME  # pylint: disable=relative-import
6 from helpers.common import TESTREALM  # pylint: disable=relative-import
7 from helpers.http import HttpSessions  # pylint: disable=relative-import
8 from ipsilon.tools.saml2metadata import SAML2_NAMEID_MAP
9 import os
10 import pwd
11 import sys
12 import re
13 from string import Template
14
15
16 idp_g = {'TEMPLATES': '${TESTDIR}/templates/install',
17          'CONFDIR': '${TESTDIR}/etc',
18          'DATADIR': '${TESTDIR}/lib',
19          'HTTPDCONFD': '${TESTDIR}/${NAME}/conf.d',
20          'STATICDIR': '${ROOTDIR}',
21          'BINDIR': '${ROOTDIR}/ipsilon',
22          'WSGI_SOCKET_PREFIX': '${TESTDIR}/${NAME}/logs/wsgi'}
23
24
25 idp_a = {'hostname': '${ADDRESS}:${PORT}',
26          'admin_user': '${TEST_USER}',
27          'system_user': '${TEST_USER}',
28          'instance': '${NAME}',
29          'secure': 'no',
30          'testauth': 'yes',
31          'pam': 'no',
32          'gssapi': 'yes',
33          'gssapi_httpd_keytab': '${TESTDIR}/${HTTP_KTNAME}',
34          'ipa': 'no',
35          'server_debugging': 'True'}
36
37
38 sp_g = {'HTTPDCONFD': '${TESTDIR}/${NAME}/conf.d',
39         'SAML2_TEMPLATE': '${TESTDIR}/templates/install/saml2/sp.conf',
40         'SAML2_CONFFILE': '${TESTDIR}/${NAME}/conf.d/ipsilon-saml.conf',
41         'SAML2_HTTPDIR': '${TESTDIR}/${NAME}/saml2'}
42
43
44 sp_a = {'hostname': '${ADDRESS}:${PORT}',
45         'saml_idp_metadata': 'http://%s:45080/idp1/saml2/metadata' %
46         WRAP_HOSTNAME,
47         'saml_secure_setup': 'False',
48         'saml_auth': '/sp',
49         'saml_nameid': '${NAMEID}',
50         'httpd_user': '${TEST_USER}'}
51
52
53 def generate_sp_list():
54     splist = []
55     spport = 45081
56
57     for nameid in SAML2_NAMEID_MAP.keys():
58         nameid = nameid
59         spdata = {'nameid': nameid, 'addr': '127.0.0.11', 'port': str(spport)}
60         splist.append(spdata)
61         spport += 1
62
63     return splist
64
65
66 def get_sp_by_nameid(splist, nameid):
67     for server in splist:
68         if server['nameid'] == nameid:
69             return server
70
71     return None
72
73
74 def convert_to_dict(envlist):
75     values = {}
76     for pair in envlist.split('\n'):
77         if pair.find('=') > 0:
78             (key, value) = pair.split('=', 1)
79             values[key] = value
80     return values
81
82
83 def fixup_sp_httpd(httpdir):
84     location = """
85
86 AddOutputFilter INCLUDES .html
87
88 Alias /sp ${HTTPDIR}/sp
89
90 <Directory ${HTTPDIR}/sp>
91     Require all granted
92     Options +Includes
93 </Directory>
94 """
95     index = """<!--#echo var="REMOTE_USER" -->"""
96
97     t = Template(location)
98     text = t.substitute({'HTTPDIR': httpdir})
99     with open(httpdir + '/conf.d/ipsilon-saml.conf', 'a') as f:
100         f.write(text)
101
102     os.mkdir(httpdir + '/sp')
103     with open(httpdir + '/sp/index.html', 'w') as f:
104         f.write(index)
105
106
107 class IpsilonTest(IpsilonTestBase):
108
109     def __init__(self):
110         super(IpsilonTest, self).__init__('testnameid', __file__)
111
112     def setup_servers(self, env=None):
113         os.mkdir("%s/ccaches" % self.testdir)
114
115         print "Installing KDC server"
116         kdcenv = self.setup_kdc(env)
117
118         print "Creating principals and keytabs"
119         self.setup_keys(kdcenv)
120
121         print "Getting a TGT"
122         self.kinit_keytab(kdcenv)
123
124         print "Installing IDP server"
125         name = 'idp1'
126         addr = WRAP_HOSTNAME
127         port = '45080'
128         idp = self.generate_profile(idp_g, idp_a, name, addr, port)
129         conf = self.setup_idp_server(idp, name, addr, port, env)
130
131         print "Starting IDP's httpd server"
132         env.update(kdcenv)
133         self.start_http_server(conf, env)
134
135         for spdata in generate_sp_list():
136             nameid = spdata['nameid']
137             addr = spdata['addr']
138             port = spdata['port']
139             print "Installing SP server %s" % nameid
140             sp_prof = self.generate_profile(
141                 sp_g, sp_a, nameid, addr, str(port), nameid
142             )
143             conf = self.setup_sp_server(sp_prof, nameid, addr, str(port), env)
144             fixup_sp_httpd(os.path.dirname(conf))
145
146             print "Starting SP's httpd server"
147             self.start_http_server(conf, env)
148
149
150 if __name__ == '__main__':
151
152     idpname = 'idp1'
153     user = pwd.getpwuid(os.getuid())[0]
154
155     expected = {
156         'x509':        False,   # not supported
157         'transient':   True,
158         'persistent':  True,
159         'windows':     False,   # not supported
160         'encrypted':   False,   # not supported
161         'kerberos':    True,
162         'email':       True,
163         'unspecified': True,
164         'entity':      False,   # not supported
165     }
166
167     expected_re = {
168         'x509':        'Unauthorized',    # not supported
169         'transient':   '_[0-9a-f]{32}',
170         'persistent':  '_[0-9a-f]{128}',
171         'windows':     'Unauthorized',    # not supported
172         'encrypted':   'Unauthorized',    # not supported
173         'kerberos':    '%s@%s' % (user, TESTREALM),
174         'email':       '%s@.*' % user,
175         'unspecified': user,
176         'entity':      'Unauthorized',   # not supported
177     }
178
179     testdir = os.environ['TESTDIR']
180
181     krb5conf = os.path.join(testdir, 'krb5.conf')
182     kenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin',
183             'KRB5_CONFIG': krb5conf,
184             'KRB5CCNAME': 'FILE:' + os.path.join(testdir, 'ccaches/user')}
185
186     for kkey in kenv:
187         os.environ[kkey] = kenv[kkey]
188
189     sp_list = generate_sp_list()
190     for sp in sp_list:
191         krb = False
192         spname = sp['nameid']
193         spurl = 'http://%s:%s' % (sp['addr'], sp['port'])
194         sess = HttpSessions()
195         sess.add_server(idpname, 'http://%s:45080' % WRAP_HOSTNAME, user,
196                         'ipsilon')
197         sess.add_server(spname, spurl)
198
199         print ""
200         print "testnameid: Testing NameID format %s ..." % spname
201
202         if spname == 'kerberos':
203             krb = True
204
205         print "testnameid: Authenticate to IDP ...",
206         try:
207             sess.auth_to_idp(idpname, krb=krb)
208         except Exception, e:  # pylint: disable=broad-except
209             print >> sys.stderr, " ERROR: %s" % repr(e)
210             sys.exit(1)
211         print " SUCCESS"
212
213         print "testnameid: Add SP Metadata to IDP ...",
214         try:
215             sess.add_sp_metadata(idpname, spname)
216         except Exception, e:  # pylint: disable=broad-except
217             print >> sys.stderr, " ERROR: %s" % repr(e)
218             sys.exit(1)
219         print " SUCCESS"
220
221         print "testnameid: Set supported Name ID formats ...",
222         try:
223             sess.set_sp_default_nameids(idpname, spname, [spname])
224         except Exception, e:  # pylint: disable=broad-except
225             print >> sys.stderr, " ERROR: %s" % repr(e)
226             sys.exit(1)
227         print " SUCCESS"
228
229         print "testnameid: Access SP Protected Area ...",
230         try:
231             page = sess.fetch_page(idpname, '%s/sp/' % spurl)
232             if not re.match(expected_re[spname], page.text):
233                 raise ValueError(
234                     'page did not contain expression %s' %
235                     expected_re[spname]
236                 )
237         except ValueError, e:
238             if expected[spname]:
239                 print >> sys.stderr, " ERROR: %s" % repr(e)
240                 sys.exit(1)
241             print " OK, EXPECTED TO FAIL"
242         else:
243             print " SUCCESS"
244
245         print "testnameid: Try authentication failure ...",
246         newsess = HttpSessions()
247         newsess.add_server(idpname, 'http://%s:45080' % WRAP_HOSTNAME,
248                            user, 'wrong')
249         try:
250             newsess.auth_to_idp(idpname)
251             print >> sys.stderr, " ERROR: Authentication should have failed"
252             sys.exit(1)
253         except Exception, e:  # pylint: disable=broad-except
254             print " SUCCESS"
255
256     # Ensure that transient names change with each authentication
257     sp = get_sp_by_nameid(sp_list, 'transient')
258     spname = sp['nameid']
259     spurl = 'http://%s:%s' % (sp['addr'], sp['port'])
260
261     print ""
262     print "testnameid: Testing NameID format %s ..." % spname
263
264     ids = []
265     for i in xrange(4):
266         sess = HttpSessions()
267         sess.add_server(idpname, 'http://%s:45080' % WRAP_HOSTNAME,
268                         user, 'ipsilon')
269         sess.add_server(spname, spurl)
270         print "testnameid: Authenticate to IDP ...",
271         try:
272             sess.auth_to_idp(idpname)
273         except Exception, e:  # pylint: disable=broad-except
274             print >> sys.stderr, " ERROR: %s" % repr(e)
275             sys.exit(1)
276         else:
277             print " SUCCESS"
278
279         print "testnameid: Access SP ...",
280         try:
281             page = sess.fetch_page(idpname, '%s/sp/' % spurl)
282             t1 = page.text
283         except ValueError, e:
284             print >> sys.stderr, " ERROR: %s" % repr(e)
285             sys.exit(1)
286         else:
287             print " SUCCESS"
288
289         print "testnameid: Access SP again ...",
290         try:
291             page = sess.fetch_page(idpname, '%s/sp/' % spurl)
292             t2 = page.text
293         except ValueError, e:
294             print >> sys.stderr, " ERROR: %s" % repr(e)
295             sys.exit(1)
296         else:
297             print " SUCCESS"
298
299         print "testnameid: Ensure ID is consistent between requests ...",
300         if t1 != t2:
301             print >> sys.stderr, " ERROR: New ID between reqeusts"
302         else:
303             print " SUCCESS"
304
305         ids.append(t1)
306
307     print "testnameid: Ensure uniqueness across sessions ...",
308     if len(ids) != len(set(ids)):
309         print >> sys.stderr, " ERROR: IDs are not unique between sessions"
310         sys.exit(1)
311     else:
312         print " SUCCESS"
313
314     # Ensure that persistent names remain the same with each authentication
315     sp = get_sp_by_nameid(sp_list, 'persistent')
316     spname = sp['nameid']
317     spurl = 'http://%s:%s' % (sp['addr'], sp['port'])
318
319     print ""
320     print "testnameid: Testing NameID format %s ..." % spname
321
322     ids = []
323     for i in xrange(4):
324         sess = HttpSessions()
325         sess.add_server(idpname, 'http://%s:45080' % WRAP_HOSTNAME,
326                         user, 'ipsilon')
327         sess.add_server(spname, spurl)
328         print "testnameid: Authenticate to IDP ...",
329         try:
330             sess.auth_to_idp(idpname)
331         except Exception, e:  # pylint: disable=broad-except
332             print >> sys.stderr, " ERROR: %s" % repr(e)
333             sys.exit(1)
334         else:
335             print " SUCCESS"
336
337         print "testnameid: Access SP ...",
338         try:
339             page = sess.fetch_page(idpname, '%s/sp/' % spurl)
340             t1 = page.text
341         except ValueError, e:
342             print >> sys.stderr, " ERROR: %s" % repr(e)
343             sys.exit(1)
344         else:
345             print " SUCCESS"
346
347         print "testnameid: Access SP again ...",
348         try:
349             page = sess.fetch_page(idpname, '%s/sp/' % spurl)
350             t2 = page.text
351         except ValueError, e:
352             print >> sys.stderr, " ERROR: %s" % repr(e)
353             sys.exit(1)
354         else:
355             print " SUCCESS"
356
357         print "testnameid: Ensure ID is consistent between requests ...",
358         if t1 != t2:
359             print >> sys.stderr, " ERROR: New ID between reqeusts"
360         else:
361             print " SUCCESS"
362
363         ids.append(t1)
364
365     print "testnameid: Ensure same ID across sessions ...",
366     if len(set(ids)) != 1:
367         print >> sys.stderr, " ERROR: IDs are not the same between sessions"
368         sys.exit(1)
369     else:
370         print " SUCCESS"