Create cache directory for storing images for SP Portal
[cascardo/ipsilon.git] / tests / helpers / common.py
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING
4
5 import ConfigParser
6 import io
7 import os
8 import pwd
9 import shutil
10 import signal
11 import random
12 from string import Template
13 import subprocess
14
15
16 WRAP_HOSTNAME = 'idp.ipsilon.dev'
17 TESTREALM = 'IPSILON.DEV'
18 TESTDOMAIN = 'ipsilon.dev'
19 KDC_DBNAME = 'db.file'
20 KDC_STASH = 'stash.file'
21 KDC_PASSWORD = 'ipsilon'
22 KRB5_CONF_TEMPLATE = '''
23 [libdefaults]
24   default_realm = ${TESTREALM}
25   dns_lookup_realm = false
26   dns_lookup_kdc = false
27   rdns = false
28   ticket_lifetime = 24h
29   forwardable = yes
30   default_ccache_name = FILE://${TESTDIR}/ccaches/krb5_ccache_XXXXXX
31   udp_preference_limit = 0
32
33 [realms]
34   ${TESTREALM} = {
35     kdc =${WRAP_HOSTNAME}
36   }
37
38 [domain_realm]
39   .${TESTDOMAIN} = ${TESTREALM}
40   ${TESTDOMAIN} = ${TESTREALM}
41
42 [dbmodules]
43   ${TESTREALM} = {
44     database_name = ${KDCDIR}/${KDC_DBNAME}
45   }
46 '''
47
48 KDC_CONF_TEMPLATE = '''
49 [kdcdefaults]
50  kdc_ports = 88
51  kdc_tcp_ports = 88
52  restrict_anonymous_to_tgt = true
53
54 [realms]
55  ${TESTREALM} = {
56   master_key_type = aes256-cts
57   max_life = 7d
58   max_renewable_life = 14d
59   acl_file = ${KDCDIR}/kadm5.acl
60   dict_file = /usr/share/dict/words
61   default_principal_flags = +preauth
62   admin_keytab = ${TESTREALM}/kadm5.keytab
63   key_stash_file = ${KDCDIR}/${KDC_STASH}
64  }
65 [logging]
66   kdc = FILE:${KDCLOG}
67 '''
68
69 USER_KTNAME = "user.keytab"
70 HTTP_KTNAME = "http.keytab"
71 KEY_TYPE = "aes256-cts-hmac-sha1-96:normal"
72
73
74 class IpsilonTestBase(object):
75
76     def __init__(self, name, execname):
77         self.name = name
78         self.execname = execname
79         self.rootdir = os.getcwd()
80         self.testdir = None
81         self.testuser = pwd.getpwuid(os.getuid())[0]
82         self.processes = []
83
84     def force_remove(self, op, name, info):
85         os.chmod(name, 0700)
86         os.remove(name)
87
88     def setup_base(self, path, test):
89         self.testdir = os.path.join(path, test.name)
90         if os.path.exists(self.testdir):
91             shutil.rmtree(self.testdir, onerror=self.force_remove)
92         os.makedirs(self.testdir)
93         shutil.copytree(os.path.join(self.rootdir, 'templates'),
94                         os.path.join(self.testdir, 'templates'))
95         os.mkdir(os.path.join(self.testdir, 'etc'))
96         os.mkdir(os.path.join(self.testdir, 'lib'))
97         os.mkdir(os.path.join(self.testdir, 'lib', test.name))
98         os.mkdir(os.path.join(self.testdir, 'log'))
99         os.mkdir(os.path.join(self.testdir, 'cache'))
100
101     def generate_profile(self, global_opts, args_opts, name, addr, port,
102                          nameid='unspecified'):
103         newconf = ConfigParser.ConfigParser()
104         newconf.add_section('globals')
105         for k in global_opts:
106             newconf.set('globals', k, global_opts[k])
107         newconf.add_section('arguments')
108         for k in args_opts:
109             newconf.set('arguments', k, args_opts[k])
110
111         profile = io.BytesIO()
112         newconf.write(profile)
113
114         t = Template(profile.getvalue())
115         text = t.substitute({'NAME': name, 'ADDRESS': addr, 'PORT': port,
116                              'TESTDIR': self.testdir,
117                              'ROOTDIR': self.rootdir,
118                              'NAMEID': nameid,
119                              'HTTP_KTNAME': HTTP_KTNAME,
120                              'TEST_USER': self.testuser})
121
122         filename = os.path.join(self.testdir, '%s_profile.cfg' % name)
123         with open(filename, 'wb') as f:
124             f.write(text)
125
126         return filename
127
128     def setup_http(self, name, addr, port):
129         httpdir = os.path.join(self.testdir, name)
130         os.mkdir(httpdir)
131         os.mkdir(os.path.join(httpdir, 'conf.d'))
132         os.mkdir(os.path.join(httpdir, 'html'))
133         os.mkdir(os.path.join(httpdir, 'logs'))
134         os.symlink('/etc/httpd/modules', os.path.join(httpdir, 'modules'))
135
136         with open(os.path.join(self.rootdir, 'tests/httpd.conf')) as f:
137             t = Template(f.read())
138             text = t.substitute({'HTTPROOT': httpdir,
139                                  'HTTPADDR': addr,
140                                  'HTTPPORT': port})
141         filename = os.path.join(httpdir, 'httpd.conf')
142         with open(filename, 'w+') as f:
143             f.write(text)
144
145         return filename
146
147     def setup_idp_server(self, profile, name, addr, port, env):
148         http_conf_file = self.setup_http(name, addr, port)
149         cmd = [os.path.join(self.rootdir,
150                             'ipsilon/install/ipsilon-server-install'),
151                '--config-profile=%s' % profile]
152         subprocess.check_call(cmd, env=env)
153         os.symlink(os.path.join(self.rootdir, 'ipsilon'),
154                    os.path.join(self.testdir, 'lib', name, 'ipsilon'))
155
156         return http_conf_file
157
158     def setup_sp_server(self, profile, name, addr, port, env):
159         http_conf_file = self.setup_http(name, addr, port)
160         cmd = [os.path.join(self.rootdir,
161                             'ipsilon/install/ipsilon-client-install'),
162                '--config-profile=%s' % profile]
163         subprocess.check_call(cmd, env=env)
164
165         return http_conf_file
166
167     def setup_pgdb(self, datadir, env):
168         cmd = ['/usr/bin/pg_ctl', 'initdb', '-D', datadir]
169         subprocess.check_call(cmd, env=env)
170         auth = 'host all all 127.0.0.1/24 trust\n'
171         filename = os.path.join(datadir, 'pg_hba.conf')
172         with open(filename, 'a') as f:
173             f.write(auth)
174
175     def start_http_server(self, conf, env):
176         env['MALLOC_CHECK_'] = '3'
177         env['MALLOC_PERTURB_'] = str(random.randint(0, 32767) % 255 + 1)
178         p = subprocess.Popen(['/usr/sbin/httpd', '-DFOREGROUND', '-f', conf],
179                              env=env, preexec_fn=os.setsid)
180         self.processes.append(p)
181         return p
182
183     def start_pgdb_server(self, datadir, rundir, log, addr, port, env):
184         p = subprocess.Popen(['/usr/bin/pg_ctl', 'start', '-D', datadir, '-o',
185                               '-c unix_socket_directories=%s -c port=%s -c \
186                                listen_addresses=%s' % (rundir, port, addr),
187                               '-l', log, '-w'],
188                              env=env, preexec_fn=os.setsid)
189         self.processes.append(p)
190         p.wait()
191         for d in ['adminconfig', 'users', 'transactions', 'sessions',
192                   'saml2.sessions.db']:
193             cmd = ['/usr/bin/createdb', '-h', addr, '-p', port, d]
194             subprocess.check_call(cmd, env=env)
195
196     def setup_ldap(self, env):
197         ldapdir = os.path.join(self.testdir, 'ldap')
198         os.mkdir(ldapdir)
199         with open(os.path.join(self.rootdir, 'tests/slapd.conf')) as f:
200             t = Template(f.read())
201             text = t.substitute({'ldapdir': ldapdir})
202         filename = os.path.join(ldapdir, 'slapd.conf')
203         with open(filename, 'w+') as f:
204             f.write(text)
205         subprocess.check_call(['/usr/sbin/slapadd', '-f', filename, '-l',
206                                'tests/ldapdata.ldif'], env=env)
207
208         return filename
209
210     def start_ldap_server(self, conf, addr, port, env):
211         p = subprocess.Popen(['/usr/sbin/slapd', '-d', '0', '-f', conf,
212                              '-h', 'ldap://%s:%s' % (addr, port)],
213                              env=env, preexec_fn=os.setsid)
214         self.processes.append(p)
215
216     def setup_kdc(self, env):
217
218         # setup kerberos environment
219         testlog = os.path.join(self.testdir, 'kerb.log')
220         krb5conf = os.path.join(self.testdir, 'krb5.conf')
221         kdcconf = os.path.join(self.testdir, 'kdc.conf')
222         kdcdir = os.path.join(self.testdir, 'kdc')
223         if os.path.exists(kdcdir):
224             shutil.rmtree(kdcdir)
225         os.makedirs(kdcdir)
226
227         t = Template(KRB5_CONF_TEMPLATE)
228         text = t.substitute({'TESTREALM': TESTREALM,
229                              'TESTDOMAIN': TESTDOMAIN,
230                              'TESTDIR': self.testdir,
231                              'KDCDIR': kdcdir,
232                              'KDC_DBNAME': KDC_DBNAME,
233                              'WRAP_HOSTNAME': WRAP_HOSTNAME})
234         with open(krb5conf, 'w+') as f:
235             f.write(text)
236
237         t = Template(KDC_CONF_TEMPLATE)
238         text = t.substitute({'TESTREALM': TESTREALM,
239                              'KDCDIR': kdcdir,
240                              'KDCLOG': testlog,
241                              'KDC_STASH': KDC_STASH})
242         with open(kdcconf, 'w+') as f:
243             f.write(text)
244
245         kdcenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin',
246                   'KRB5_CONFIG': krb5conf,
247                   'KRB5_KDC_PROFILE': kdcconf}
248         kdcenv.update(env)
249
250         with (open(testlog, 'a')) as logfile:
251             ksetup = subprocess.Popen(["kdb5_util", "create", "-s",
252                                        "-r", TESTREALM, "-P", KDC_PASSWORD],
253                                       stdout=logfile, stderr=logfile,
254                                       env=kdcenv, preexec_fn=os.setsid)
255         ksetup.wait()
256         if ksetup.returncode != 0:
257             raise ValueError('KDC Setup failed')
258
259         kdcproc = subprocess.Popen(['krb5kdc', '-n'],
260                                    env=kdcenv, preexec_fn=os.setsid)
261         self.processes.append(kdcproc)
262
263         return kdcenv
264
265     def kadmin_local(self, cmd, env, logfile):
266         ksetup = subprocess.Popen(["kadmin.local", "-q", cmd],
267                                   stdout=logfile, stderr=logfile,
268                                   env=env, preexec_fn=os.setsid)
269         ksetup.wait()
270         if ksetup.returncode != 0:
271             raise ValueError('Kadmin local [%s] failed' % cmd)
272
273     def setup_keys(self, env):
274
275         testlog = os.path.join(self.testdir, 'kerb.log')
276
277         svc_name = "HTTP/%s" % WRAP_HOSTNAME
278         svc_keytab = os.path.join(self.testdir, HTTP_KTNAME)
279         cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, svc_name)
280         with (open(testlog, 'a')) as logfile:
281             self.kadmin_local(cmd, env, logfile)
282         cmd = "ktadd -k %s -e %s %s" % (svc_keytab, KEY_TYPE, svc_name)
283         with (open(testlog, 'a')) as logfile:
284             self.kadmin_local(cmd, env, logfile)
285
286         usr_keytab = os.path.join(self.testdir, USER_KTNAME)
287         cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, self.testuser)
288         with (open(testlog, 'a')) as logfile:
289             self.kadmin_local(cmd, env, logfile)
290         cmd = "ktadd -k %s -e %s %s" % (usr_keytab, KEY_TYPE, self.testuser)
291         with (open(testlog, 'a')) as logfile:
292             self.kadmin_local(cmd, env, logfile)
293
294         keys_env = {"KRB5_KTNAME": svc_keytab}
295         keys_env.update(env)
296
297         return keys_env
298
299     def kinit_keytab(self, kdcenv):
300         testlog = os.path.join(self.testdir, 'kinit.log')
301         usr_keytab = os.path.join(self.testdir, USER_KTNAME)
302         kdcenv['KRB5CCNAME'] = 'FILE:' + os.path.join(
303             self.testdir, 'ccaches/user')
304         with (open(testlog, 'a')) as logfile:
305             logfile.write("\n%s\n" % kdcenv)
306             ksetup = subprocess.Popen(["kinit", "-kt", usr_keytab,
307                                        self.testuser],
308                                       stdout=logfile, stderr=logfile,
309                                       env=kdcenv, preexec_fn=os.setsid)
310             ksetup.wait()
311             if ksetup.returncode != 0:
312                 raise ValueError('kinit %s failed' % self.testuser)
313
314     def wait(self):
315         for p in self.processes:
316             os.killpg(p.pid, signal.SIGTERM)
317
318     def setup_servers(self, env=None):
319         raise NotImplementedError()
320
321     def run(self, env):
322         exe = self.execname
323         if exe.endswith('c'):
324             exe = exe[:-1]
325         return subprocess.call([exe], env=env)