Update Copyright header point to COPYING file
[cascardo/ipsilon.git] / tests / testnameid.py
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING
4
5 from helpers.common import IpsilonTestBase  # pylint: disable=relative-import
6 from helpers.common import WRAP_HOSTNAME  # pylint: disable=relative-import
7 from helpers.common import TESTREALM  # pylint: disable=relative-import
8 from helpers.http import HttpSessions  # pylint: disable=relative-import
9 from ipsilon.tools.saml2metadata import SAML2_NAMEID_MAP
10 import os
11 import pwd
12 import sys
13 import re
14 from string import Template
15
16
17 idp_g = {'TEMPLATES': '${TESTDIR}/templates/install',
18          'CONFDIR': '${TESTDIR}/etc',
19          'DATADIR': '${TESTDIR}/lib',
20          'HTTPDCONFD': '${TESTDIR}/${NAME}/conf.d',
21          'STATICDIR': '${ROOTDIR}',
22          'BINDIR': '${ROOTDIR}/ipsilon',
23          'WSGI_SOCKET_PREFIX': '${TESTDIR}/${NAME}/logs/wsgi'}
24
25
26 idp_a = {'hostname': '${ADDRESS}:${PORT}',
27          'admin_user': '${TEST_USER}',
28          'system_user': '${TEST_USER}',
29          'instance': '${NAME}',
30          'secure': 'no',
31          'testauth': 'yes',
32          'pam': 'no',
33          'gssapi': 'yes',
34          'gssapi_httpd_keytab': '${TESTDIR}/${HTTP_KTNAME}',
35          'ipa': 'no',
36          'server_debugging': 'True'}
37
38
39 sp_g = {'HTTPDCONFD': '${TESTDIR}/${NAME}/conf.d',
40         'SAML2_TEMPLATE': '${TESTDIR}/templates/install/saml2/sp.conf',
41         'SAML2_CONFFILE': '${TESTDIR}/${NAME}/conf.d/ipsilon-saml.conf',
42         'SAML2_HTTPDIR': '${TESTDIR}/${NAME}/saml2'}
43
44
45 sp_a = {'hostname': '${ADDRESS}:${PORT}',
46         'saml_idp_metadata': 'http://%s:45080/idp1/saml2/metadata' %
47         WRAP_HOSTNAME,
48         'saml_secure_setup': 'False',
49         'saml_auth': '/sp',
50         'saml_nameid': '${NAMEID}',
51         'httpd_user': '${TEST_USER}'}
52
53
54 def generate_sp_list():
55     splist = []
56     spport = 45081
57
58     for nameid in SAML2_NAMEID_MAP.keys():
59         nameid = nameid
60         spdata = {'nameid': nameid, 'addr': '127.0.0.11', 'port': str(spport)}
61         splist.append(spdata)
62         spport += 1
63
64     return splist
65
66
67 def get_sp_by_nameid(splist, nameid):
68     for server in splist:
69         if server['nameid'] == nameid:
70             return server
71
72     return None
73
74
75 def convert_to_dict(envlist):
76     values = {}
77     for pair in envlist.split('\n'):
78         if pair.find('=') > 0:
79             (key, value) = pair.split('=', 1)
80             values[key] = value
81     return values
82
83
84 def fixup_sp_httpd(httpdir):
85     location = """
86
87 AddOutputFilter INCLUDES .html
88
89 Alias /sp ${HTTPDIR}/sp
90
91 <Directory ${HTTPDIR}/sp>
92     Require all granted
93     Options +Includes
94 </Directory>
95 """
96     index = """<!--#echo var="REMOTE_USER" -->"""
97
98     t = Template(location)
99     text = t.substitute({'HTTPDIR': httpdir})
100     with open(httpdir + '/conf.d/ipsilon-saml.conf', 'a') as f:
101         f.write(text)
102
103     os.mkdir(httpdir + '/sp')
104     with open(httpdir + '/sp/index.html', 'w') as f:
105         f.write(index)
106
107
108 class IpsilonTest(IpsilonTestBase):
109
110     def __init__(self):
111         super(IpsilonTest, self).__init__('testnameid', __file__)
112
113     def setup_servers(self, env=None):
114         os.mkdir("%s/ccaches" % self.testdir)
115
116         print "Installing KDC server"
117         kdcenv = self.setup_kdc(env)
118
119         print "Creating principals and keytabs"
120         self.setup_keys(kdcenv)
121
122         print "Getting a TGT"
123         self.kinit_keytab(kdcenv)
124
125         print "Installing IDP server"
126         name = 'idp1'
127         addr = WRAP_HOSTNAME
128         port = '45080'
129         idp = self.generate_profile(idp_g, idp_a, name, addr, port)
130         conf = self.setup_idp_server(idp, name, addr, port, env)
131
132         print "Starting IDP's httpd server"
133         env.update(kdcenv)
134         self.start_http_server(conf, env)
135
136         for spdata in generate_sp_list():
137             nameid = spdata['nameid']
138             addr = spdata['addr']
139             port = spdata['port']
140             print "Installing SP server %s" % nameid
141             sp_prof = self.generate_profile(
142                 sp_g, sp_a, nameid, addr, str(port), nameid
143             )
144             conf = self.setup_sp_server(sp_prof, nameid, addr, str(port), env)
145             fixup_sp_httpd(os.path.dirname(conf))
146
147             print "Starting SP's httpd server"
148             self.start_http_server(conf, env)
149
150
151 if __name__ == '__main__':
152
153     idpname = 'idp1'
154     user = pwd.getpwuid(os.getuid())[0]
155
156     expected = {
157         'x509':        False,   # not supported
158         'transient':   True,
159         'persistent':  True,
160         'windows':     False,   # not supported
161         'encrypted':   False,   # not supported
162         'kerberos':    True,
163         'email':       True,
164         'unspecified': True,
165         'entity':      False,   # not supported
166     }
167
168     expected_re = {
169         'x509':        'Unauthorized',    # not supported
170         'transient':   '_[0-9a-f]{32}',
171         'persistent':  '_[0-9a-f]{128}',
172         'windows':     'Unauthorized',    # not supported
173         'encrypted':   'Unauthorized',    # not supported
174         'kerberos':    '%s@%s' % (user, TESTREALM),
175         'email':       '%s@.*' % user,
176         'unspecified': user,
177         'entity':      'Unauthorized',   # not supported
178     }
179
180     testdir = os.environ['TESTDIR']
181
182     krb5conf = os.path.join(testdir, 'krb5.conf')
183     kenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin',
184             'KRB5_CONFIG': krb5conf,
185             'KRB5CCNAME': 'FILE:' + os.path.join(testdir, 'ccaches/user')}
186
187     for kkey in kenv:
188         os.environ[kkey] = kenv[kkey]
189
190     sp_list = generate_sp_list()
191     for sp in sp_list:
192         krb = False
193         spname = sp['nameid']
194         spurl = 'http://%s:%s' % (sp['addr'], sp['port'])
195         sess = HttpSessions()
196         sess.add_server(idpname, 'http://%s:45080' % WRAP_HOSTNAME, user,
197                         'ipsilon')
198         sess.add_server(spname, spurl)
199
200         print ""
201         print "testnameid: Testing NameID format %s ..." % spname
202
203         if spname == 'kerberos':
204             krb = True
205
206         print "testnameid: Authenticate to IDP ...",
207         try:
208             sess.auth_to_idp(idpname, krb=krb)
209         except Exception, e:  # pylint: disable=broad-except
210             print >> sys.stderr, " ERROR: %s" % repr(e)
211             sys.exit(1)
212         print " SUCCESS"
213
214         print "testnameid: Add SP Metadata to IDP ...",
215         try:
216             sess.add_sp_metadata(idpname, spname)
217         except Exception, e:  # pylint: disable=broad-except
218             print >> sys.stderr, " ERROR: %s" % repr(e)
219             sys.exit(1)
220         print " SUCCESS"
221
222         print "testnameid: Set supported Name ID formats ...",
223         try:
224             sess.set_sp_default_nameids(idpname, spname, [spname])
225         except Exception, e:  # pylint: disable=broad-except
226             print >> sys.stderr, " ERROR: %s" % repr(e)
227             sys.exit(1)
228         print " SUCCESS"
229
230         print "testnameid: Access SP Protected Area ...",
231         try:
232             page = sess.fetch_page(idpname, '%s/sp/' % spurl)
233             if not re.match(expected_re[spname], page.text):
234                 raise ValueError(
235                     'page did not contain expression %s' %
236                     expected_re[spname]
237                 )
238         except ValueError, e:
239             if expected[spname]:
240                 print >> sys.stderr, " ERROR: %s" % repr(e)
241                 sys.exit(1)
242             print " OK, EXPECTED TO FAIL"
243         else:
244             print " SUCCESS"
245
246         print "testnameid: Try authentication failure ...",
247         newsess = HttpSessions()
248         newsess.add_server(idpname, 'http://%s:45080' % WRAP_HOSTNAME,
249                            user, 'wrong')
250         try:
251             newsess.auth_to_idp(idpname)
252             print >> sys.stderr, " ERROR: Authentication should have failed"
253             sys.exit(1)
254         except Exception, e:  # pylint: disable=broad-except
255             print " SUCCESS"
256
257     # Ensure that transient names change with each authentication
258     sp = get_sp_by_nameid(sp_list, 'transient')
259     spname = sp['nameid']
260     spurl = 'http://%s:%s' % (sp['addr'], sp['port'])
261
262     print ""
263     print "testnameid: Testing NameID format %s ..." % spname
264
265     ids = []
266     for i in xrange(4):
267         sess = HttpSessions()
268         sess.add_server(idpname, 'http://%s:45080' % WRAP_HOSTNAME,
269                         user, 'ipsilon')
270         sess.add_server(spname, spurl)
271         print "testnameid: Authenticate to IDP ...",
272         try:
273             sess.auth_to_idp(idpname)
274         except Exception, e:  # pylint: disable=broad-except
275             print >> sys.stderr, " ERROR: %s" % repr(e)
276             sys.exit(1)
277         else:
278             print " SUCCESS"
279
280         print "testnameid: Access SP ...",
281         try:
282             page = sess.fetch_page(idpname, '%s/sp/' % spurl)
283             t1 = page.text
284         except ValueError, e:
285             print >> sys.stderr, " ERROR: %s" % repr(e)
286             sys.exit(1)
287         else:
288             print " SUCCESS"
289
290         print "testnameid: Access SP again ...",
291         try:
292             page = sess.fetch_page(idpname, '%s/sp/' % spurl)
293             t2 = page.text
294         except ValueError, e:
295             print >> sys.stderr, " ERROR: %s" % repr(e)
296             sys.exit(1)
297         else:
298             print " SUCCESS"
299
300         print "testnameid: Ensure ID is consistent between requests ...",
301         if t1 != t2:
302             print >> sys.stderr, " ERROR: New ID between reqeusts"
303         else:
304             print " SUCCESS"
305
306         ids.append(t1)
307
308     print "testnameid: Ensure uniqueness across sessions ...",
309     if len(ids) != len(set(ids)):
310         print >> sys.stderr, " ERROR: IDs are not unique between sessions"
311         sys.exit(1)
312     else:
313         print " SUCCESS"
314
315     # Ensure that persistent names remain the same with each authentication
316     sp = get_sp_by_nameid(sp_list, 'persistent')
317     spname = sp['nameid']
318     spurl = 'http://%s:%s' % (sp['addr'], sp['port'])
319
320     print ""
321     print "testnameid: Testing NameID format %s ..." % spname
322
323     ids = []
324     for i in xrange(4):
325         sess = HttpSessions()
326         sess.add_server(idpname, 'http://%s:45080' % WRAP_HOSTNAME,
327                         user, 'ipsilon')
328         sess.add_server(spname, spurl)
329         print "testnameid: Authenticate to IDP ...",
330         try:
331             sess.auth_to_idp(idpname)
332         except Exception, e:  # pylint: disable=broad-except
333             print >> sys.stderr, " ERROR: %s" % repr(e)
334             sys.exit(1)
335         else:
336             print " SUCCESS"
337
338         print "testnameid: Access SP ...",
339         try:
340             page = sess.fetch_page(idpname, '%s/sp/' % spurl)
341             t1 = page.text
342         except ValueError, e:
343             print >> sys.stderr, " ERROR: %s" % repr(e)
344             sys.exit(1)
345         else:
346             print " SUCCESS"
347
348         print "testnameid: Access SP again ...",
349         try:
350             page = sess.fetch_page(idpname, '%s/sp/' % spurl)
351             t2 = page.text
352         except ValueError, e:
353             print >> sys.stderr, " ERROR: %s" % repr(e)
354             sys.exit(1)
355         else:
356             print " SUCCESS"
357
358         print "testnameid: Ensure ID is consistent between requests ...",
359         if t1 != t2:
360             print >> sys.stderr, " ERROR: New ID between reqeusts"
361         else:
362             print " SUCCESS"
363
364         ids.append(t1)
365
366     print "testnameid: Ensure same ID across sessions ...",
367     if len(set(ids)) != 1:
368         print >> sys.stderr, " ERROR: IDs are not the same between sessions"
369         sys.exit(1)
370     else:
371         print " SUCCESS"