pam: use a pam object method instead of pam module function
[cascardo/ipsilon.git] / tests / testnameid.py
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2015 Ipsilon project Contributors, for license see COPYING
4
5 from helpers.common import IpsilonTestBase  # pylint: disable=relative-import
6 from helpers.common import WRAP_HOSTNAME  # pylint: disable=relative-import
7 from helpers.common import TESTREALM  # pylint: disable=relative-import
8 from helpers.http import HttpSessions  # pylint: disable=relative-import
9 from ipsilon.tools.saml2metadata import SAML2_NAMEID_MAP
10 import os
11 import pwd
12 import sys
13 import re
14 from string import Template
15
16
17 idp_g = {'TEMPLATES': '${TESTDIR}/templates/install',
18          'CONFDIR': '${TESTDIR}/etc',
19          'DATADIR': '${TESTDIR}/lib',
20          'CACHEDIR': '${TESTDIR}/cache',
21          'HTTPDCONFD': '${TESTDIR}/${NAME}/conf.d',
22          'STATICDIR': '${ROOTDIR}',
23          'BINDIR': '${ROOTDIR}/ipsilon',
24          'WSGI_SOCKET_PREFIX': '${TESTDIR}/${NAME}/logs/wsgi'}
25
26
27 idp_a = {'hostname': '${ADDRESS}:${PORT}',
28          'admin_user': '${TEST_USER}',
29          'system_user': '${TEST_USER}',
30          'instance': '${NAME}',
31          'secure': 'no',
32          'testauth': 'yes',
33          'pam': 'no',
34          'gssapi': 'yes',
35          'gssapi_httpd_keytab': '${TESTDIR}/${HTTP_KTNAME}',
36          'ipa': 'no',
37          'server_debugging': 'True'}
38
39
40 sp_g = {'HTTPDCONFD': '${TESTDIR}/${NAME}/conf.d',
41         'SAML2_TEMPLATE': '${TESTDIR}/templates/install/saml2/sp.conf',
42         'SAML2_CONFFILE': '${TESTDIR}/${NAME}/conf.d/ipsilon-saml.conf',
43         'SAML2_HTTPDIR': '${TESTDIR}/${NAME}/saml2'}
44
45
46 sp_a = {'hostname': '${ADDRESS}:${PORT}',
47         'saml_idp_metadata': 'http://%s:45080/idp1/saml2/metadata' %
48         WRAP_HOSTNAME,
49         'saml_secure_setup': 'False',
50         'saml_auth': '/sp',
51         'saml_nameid': '${NAMEID}',
52         'httpd_user': '${TEST_USER}'}
53
54
55 def generate_sp_list():
56     splist = []
57     spport = 45081
58
59     for nameid in SAML2_NAMEID_MAP:
60         nameid = nameid
61         spdata = {'nameid': nameid, 'addr': '127.0.0.11', 'port': str(spport)}
62         splist.append(spdata)
63         spport += 1
64
65     return splist
66
67
68 def get_sp_by_nameid(splist, nameid):
69     for server in splist:
70         if server['nameid'] == nameid:
71             return server
72
73     return None
74
75
76 def convert_to_dict(envlist):
77     values = {}
78     for pair in envlist.split('\n'):
79         if pair.find('=') > 0:
80             (key, value) = pair.split('=', 1)
81             values[key] = value
82     return values
83
84
85 def fixup_sp_httpd(httpdir):
86     location = """
87
88 AddOutputFilter INCLUDES .html
89
90 Alias /sp ${HTTPDIR}/sp
91
92 <Directory ${HTTPDIR}/sp>
93     Require all granted
94     Options +Includes
95 </Directory>
96 """
97     index = """<!--#echo var="REMOTE_USER" -->"""
98
99     t = Template(location)
100     text = t.substitute({'HTTPDIR': httpdir})
101     with open(httpdir + '/conf.d/ipsilon-saml.conf', 'a') as f:
102         f.write(text)
103
104     os.mkdir(httpdir + '/sp')
105     with open(httpdir + '/sp/index.html', 'w') as f:
106         f.write(index)
107
108
109 class IpsilonTest(IpsilonTestBase):
110
111     def __init__(self):
112         super(IpsilonTest, self).__init__('testnameid', __file__)
113
114     def setup_servers(self, env=None):
115         os.mkdir("%s/ccaches" % self.testdir)
116
117         print "Installing KDC server"
118         kdcenv = self.setup_kdc(env)
119
120         print "Creating principals and keytabs"
121         self.setup_keys(kdcenv)
122
123         print "Getting a TGT"
124         self.kinit_keytab(kdcenv)
125
126         print "Installing IDP server"
127         name = 'idp1'
128         addr = WRAP_HOSTNAME
129         port = '45080'
130         idp = self.generate_profile(idp_g, idp_a, name, addr, port)
131         conf = self.setup_idp_server(idp, name, addr, port, env)
132
133         print "Starting IDP's httpd server"
134         env.update(kdcenv)
135         self.start_http_server(conf, env)
136
137         for spdata in generate_sp_list():
138             nameid = spdata['nameid']
139             addr = spdata['addr']
140             port = spdata['port']
141             print "Installing SP server %s" % nameid
142             sp_prof = self.generate_profile(
143                 sp_g, sp_a, nameid, addr, str(port), nameid
144             )
145             conf = self.setup_sp_server(sp_prof, nameid, addr, str(port), env)
146             fixup_sp_httpd(os.path.dirname(conf))
147
148             print "Starting SP's httpd server"
149             self.start_http_server(conf, env)
150
151
152 if __name__ == '__main__':
153
154     idpname = 'idp1'
155     user = pwd.getpwuid(os.getuid())[0]
156
157     expected = {
158         'x509':        False,   # not supported
159         'transient':   True,
160         'persistent':  True,
161         'windows':     False,   # not supported
162         'encrypted':   False,   # not supported
163         'kerberos':    True,
164         'email':       True,
165         'unspecified': True,
166         'entity':      False,   # not supported
167     }
168
169     expected_re = {
170         'x509':        'Unauthorized',    # not supported
171         'transient':   '_[0-9a-f]{32}',
172         'persistent':  '_[0-9a-f]{128}',
173         'windows':     'Unauthorized',    # not supported
174         'encrypted':   'Unauthorized',    # not supported
175         'kerberos':    '%s@%s' % (user, TESTREALM),
176         'email':       '%s@.*' % user,
177         'unspecified': user,
178         'entity':      'Unauthorized',   # not supported
179     }
180
181     testdir = os.environ['TESTDIR']
182
183     krb5conf = os.path.join(testdir, 'krb5.conf')
184     kenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin',
185             'KRB5_CONFIG': krb5conf,
186             'KRB5CCNAME': 'FILE:' + os.path.join(testdir, 'ccaches/user')}
187
188     for kkey in kenv:
189         os.environ[kkey] = kenv[kkey]
190
191     sp_list = generate_sp_list()
192     for sp in sp_list:
193         krb = False
194         spname = sp['nameid']
195         spurl = 'http://%s:%s' % (sp['addr'], sp['port'])
196         sess = HttpSessions()
197         sess.add_server(idpname, 'http://%s:45080' % WRAP_HOSTNAME, user,
198                         'ipsilon')
199         sess.add_server(spname, spurl)
200
201         print ""
202         print "testnameid: Testing NameID format %s ..." % spname
203
204         if spname == 'kerberos':
205             krb = True
206
207         print "testnameid: Authenticate to IDP ...",
208         try:
209             sess.auth_to_idp(idpname, krb=krb)
210         except Exception, e:  # pylint: disable=broad-except
211             print >> sys.stderr, " ERROR: %s" % repr(e)
212             sys.exit(1)
213         print " SUCCESS"
214
215         print "testnameid: Add SP Metadata to IDP ...",
216         try:
217             sess.add_sp_metadata(idpname, spname)
218         except Exception, e:  # pylint: disable=broad-except
219             print >> sys.stderr, " ERROR: %s" % repr(e)
220             sys.exit(1)
221         print " SUCCESS"
222
223         print "testnameid: Set supported Name ID formats ...",
224         try:
225             sess.set_sp_default_nameids(idpname, spname, [spname])
226         except Exception, e:  # pylint: disable=broad-except
227             print >> sys.stderr, " ERROR: %s" % repr(e)
228             sys.exit(1)
229         print " SUCCESS"
230
231         print "testnameid: Access SP Protected Area ...",
232         try:
233             page = sess.fetch_page(idpname, '%s/sp/' % spurl)
234             if not re.match(expected_re[spname], page.text):
235                 raise ValueError(
236                     'page did not contain expression %s' %
237                     expected_re[spname]
238                 )
239         except ValueError, e:
240             if expected[spname]:
241                 print >> sys.stderr, " ERROR: %s" % repr(e)
242                 sys.exit(1)
243             print " OK, EXPECTED TO FAIL"
244         else:
245             print " SUCCESS"
246
247         print "testnameid: Try authentication failure ...",
248         newsess = HttpSessions()
249         newsess.add_server(idpname, 'http://%s:45080' % WRAP_HOSTNAME,
250                            user, 'wrong')
251         try:
252             newsess.auth_to_idp(idpname)
253             print >> sys.stderr, " ERROR: Authentication should have failed"
254             sys.exit(1)
255         except Exception, e:  # pylint: disable=broad-except
256             print " SUCCESS"
257
258     # Ensure that transient names change with each authentication
259     sp = get_sp_by_nameid(sp_list, 'transient')
260     spname = sp['nameid']
261     spurl = 'http://%s:%s' % (sp['addr'], sp['port'])
262
263     print ""
264     print "testnameid: Testing NameID format %s ..." % spname
265
266     ids = []
267     for i in xrange(4):
268         sess = HttpSessions()
269         sess.add_server(idpname, 'http://%s:45080' % WRAP_HOSTNAME,
270                         user, 'ipsilon')
271         sess.add_server(spname, spurl)
272         print "testnameid: Authenticate to IDP ...",
273         try:
274             sess.auth_to_idp(idpname)
275         except Exception, e:  # pylint: disable=broad-except
276             print >> sys.stderr, " ERROR: %s" % repr(e)
277             sys.exit(1)
278         else:
279             print " SUCCESS"
280
281         print "testnameid: Access SP ...",
282         try:
283             page = sess.fetch_page(idpname, '%s/sp/' % spurl)
284             t1 = page.text
285         except ValueError, e:
286             print >> sys.stderr, " ERROR: %s" % repr(e)
287             sys.exit(1)
288         else:
289             print " SUCCESS"
290
291         print "testnameid: Access SP again ...",
292         try:
293             page = sess.fetch_page(idpname, '%s/sp/' % spurl)
294             t2 = page.text
295         except ValueError, e:
296             print >> sys.stderr, " ERROR: %s" % repr(e)
297             sys.exit(1)
298         else:
299             print " SUCCESS"
300
301         print "testnameid: Ensure ID is consistent between requests ...",
302         if t1 != t2:
303             print >> sys.stderr, " ERROR: New ID between reqeusts"
304         else:
305             print " SUCCESS"
306
307         ids.append(t1)
308
309     print "testnameid: Ensure uniqueness across sessions ...",
310     if len(ids) != len(set(ids)):
311         print >> sys.stderr, " ERROR: IDs are not unique between sessions"
312         sys.exit(1)
313     else:
314         print " SUCCESS"
315
316     # Ensure that persistent names remain the same with each authentication
317     sp = get_sp_by_nameid(sp_list, 'persistent')
318     spname = sp['nameid']
319     spurl = 'http://%s:%s' % (sp['addr'], sp['port'])
320
321     print ""
322     print "testnameid: Testing NameID format %s ..." % spname
323
324     ids = []
325     for i in xrange(4):
326         sess = HttpSessions()
327         sess.add_server(idpname, 'http://%s:45080' % WRAP_HOSTNAME,
328                         user, 'ipsilon')
329         sess.add_server(spname, spurl)
330         print "testnameid: Authenticate to IDP ...",
331         try:
332             sess.auth_to_idp(idpname)
333         except Exception, e:  # pylint: disable=broad-except
334             print >> sys.stderr, " ERROR: %s" % repr(e)
335             sys.exit(1)
336         else:
337             print " SUCCESS"
338
339         print "testnameid: Access SP ...",
340         try:
341             page = sess.fetch_page(idpname, '%s/sp/' % spurl)
342             t1 = page.text
343         except ValueError, e:
344             print >> sys.stderr, " ERROR: %s" % repr(e)
345             sys.exit(1)
346         else:
347             print " SUCCESS"
348
349         print "testnameid: Access SP again ...",
350         try:
351             page = sess.fetch_page(idpname, '%s/sp/' % spurl)
352             t2 = page.text
353         except ValueError, e:
354             print >> sys.stderr, " ERROR: %s" % repr(e)
355             sys.exit(1)
356         else:
357             print " SUCCESS"
358
359         print "testnameid: Ensure ID is consistent between requests ...",
360         if t1 != t2:
361             print >> sys.stderr, " ERROR: New ID between reqeusts"
362         else:
363             print " SUCCESS"
364
365         ids.append(t1)
366
367     print "testnameid: Ensure same ID across sessions ...",
368     if len(set(ids)) != 1:
369         print >> sys.stderr, " ERROR: IDs are not the same between sessions"
370         sys.exit(1)
371     else:
372         print " SUCCESS"