8e6c53b9179d32563576765c26850924fb9316b5
[cascardo/ipsilon.git] / tests / helpers / common.py
1 #!/usr/bin/python
2 #
3 # Copyright (C) 2014  Simo Sorce <simo@redhat.com>
4 #
5 # see file 'COPYING' for use and warranty information
6 #
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
11 #
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15 # GNU General Public License for more details.
16 #
17 # You should have received a copy of the GNU General Public License
18 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
19
20
21 import ConfigParser
22 import io
23 import os
24 import pwd
25 import shutil
26 import signal
27 import random
28 from string import Template
29 import subprocess
30
31
32 WRAP_HOSTNAME = 'idp.ipsilon.dev'
33 TESTREALM = 'IPSILON.DEV'
34 TESTDOMAIN = 'ipsilon.dev'
35 KDC_DBNAME = 'db.file'
36 KDC_STASH = 'stash.file'
37 KDC_PASSWORD = 'ipsilon'
38 KRB5_CONF_TEMPLATE = '''
39 [libdefaults]
40   default_realm = ${TESTREALM}
41   dns_lookup_realm = false
42   dns_lookup_kdc = false
43   rdns = false
44   ticket_lifetime = 24h
45   forwardable = yes
46   default_ccache_name = FILE://${TESTDIR}/ccaches/krb5_ccache_XXXXXX
47   udp_preference_limit = 0
48
49 [realms]
50   ${TESTREALM} = {
51     kdc =${WRAP_HOSTNAME}
52   }
53
54 [domain_realm]
55   .${TESTDOMAIN} = ${TESTREALM}
56   ${TESTDOMAIN} = ${TESTREALM}
57
58 [dbmodules]
59   ${TESTREALM} = {
60     database_name = ${KDCDIR}/${KDC_DBNAME}
61   }
62 '''
63
64 KDC_CONF_TEMPLATE = '''
65 [kdcdefaults]
66  kdc_ports = 88
67  kdc_tcp_ports = 88
68  restrict_anonymous_to_tgt = true
69
70 [realms]
71  ${TESTREALM} = {
72   master_key_type = aes256-cts
73   max_life = 7d
74   max_renewable_life = 14d
75   acl_file = ${KDCDIR}/kadm5.acl
76   dict_file = /usr/share/dict/words
77   default_principal_flags = +preauth
78   admin_keytab = ${TESTREALM}/kadm5.keytab
79   key_stash_file = ${KDCDIR}/${KDC_STASH}
80  }
81 [logging]
82   kdc = FILE:${KDCLOG}
83 '''
84
85 USER_KTNAME = "user.keytab"
86 HTTP_KTNAME = "http.keytab"
87 KEY_TYPE = "aes256-cts-hmac-sha1-96:normal"
88
89
90 class IpsilonTestBase(object):
91
92     def __init__(self, name, execname):
93         self.name = name
94         self.execname = execname
95         self.rootdir = os.getcwd()
96         self.testdir = None
97         self.testuser = pwd.getpwuid(os.getuid())[0]
98         self.processes = []
99
100     def force_remove(self, op, name, info):
101         os.chmod(name, 0700)
102         os.remove(name)
103
104     def setup_base(self, path, test):
105         self.testdir = os.path.join(path, test.name)
106         if os.path.exists(self.testdir):
107             shutil.rmtree(self.testdir, onerror=self.force_remove)
108         os.makedirs(self.testdir)
109         shutil.copytree(os.path.join(self.rootdir, 'templates'),
110                         os.path.join(self.testdir, 'templates'))
111         os.mkdir(os.path.join(self.testdir, 'etc'))
112         os.mkdir(os.path.join(self.testdir, 'lib'))
113         os.mkdir(os.path.join(self.testdir, 'lib', test.name))
114         os.mkdir(os.path.join(self.testdir, 'log'))
115
116     def generate_profile(self, global_opts, args_opts, name, addr, port,
117                          nameid='unspecified'):
118         newconf = ConfigParser.ConfigParser()
119         newconf.add_section('globals')
120         for k in global_opts.keys():
121             newconf.set('globals', k, global_opts[k])
122         newconf.add_section('arguments')
123         for k in args_opts.keys():
124             newconf.set('arguments', k, args_opts[k])
125
126         profile = io.BytesIO()
127         newconf.write(profile)
128
129         t = Template(profile.getvalue())
130         text = t.substitute({'NAME': name, 'ADDRESS': addr, 'PORT': port,
131                              'TESTDIR': self.testdir,
132                              'ROOTDIR': self.rootdir,
133                              'NAMEID': nameid,
134                              'HTTP_KTNAME': HTTP_KTNAME,
135                              'TEST_USER': self.testuser})
136
137         filename = os.path.join(self.testdir, '%s_profile.cfg' % name)
138         with open(filename, 'wb') as f:
139             f.write(text)
140
141         return filename
142
143     def setup_http(self, name, addr, port):
144         httpdir = os.path.join(self.testdir, name)
145         os.mkdir(httpdir)
146         os.mkdir(os.path.join(httpdir, 'conf.d'))
147         os.mkdir(os.path.join(httpdir, 'html'))
148         os.mkdir(os.path.join(httpdir, 'logs'))
149         os.symlink('/etc/httpd/modules', os.path.join(httpdir, 'modules'))
150
151         with open(os.path.join(self.rootdir, 'tests/httpd.conf')) as f:
152             t = Template(f.read())
153             text = t.substitute({'HTTPROOT': httpdir,
154                                  'HTTPADDR': addr,
155                                  'HTTPPORT': port})
156         filename = os.path.join(httpdir, 'httpd.conf')
157         with open(filename, 'w+') as f:
158             f.write(text)
159
160         return filename
161
162     def setup_idp_server(self, profile, name, addr, port, env):
163         http_conf_file = self.setup_http(name, addr, port)
164         cmd = [os.path.join(self.rootdir,
165                             'ipsilon/install/ipsilon-server-install'),
166                '--config-profile=%s' % profile]
167         subprocess.check_call(cmd, env=env)
168         os.symlink(os.path.join(self.rootdir, 'ipsilon'),
169                    os.path.join(self.testdir, 'lib', name, 'ipsilon'))
170
171         return http_conf_file
172
173     def setup_sp_server(self, profile, name, addr, port, env):
174         http_conf_file = self.setup_http(name, addr, port)
175         cmd = [os.path.join(self.rootdir,
176                             'ipsilon/install/ipsilon-client-install'),
177                '--config-profile=%s' % profile]
178         subprocess.check_call(cmd, env=env)
179
180         return http_conf_file
181
182     def setup_pgdb(self, datadir, env):
183         cmd = ['/usr/bin/pg_ctl', 'initdb', '-D', datadir]
184         subprocess.check_call(cmd, env=env)
185         auth = 'host all all 127.0.0.1/24 trust\n'
186         filename = os.path.join(datadir, 'pg_hba.conf')
187         with open(filename, 'a') as f:
188             f.write(auth)
189
190     def start_http_server(self, conf, env):
191         env['MALLOC_CHECK_'] = '3'
192         env['MALLOC_PERTURB_'] = str(random.randint(0, 32767) % 255 + 1)
193         p = subprocess.Popen(['/usr/sbin/httpd', '-DFOREGROUND', '-f', conf],
194                              env=env, preexec_fn=os.setsid)
195         self.processes.append(p)
196
197     def start_pgdb_server(self, datadir, rundir, log, addr, port, env):
198         p = subprocess.Popen(['/usr/bin/pg_ctl', 'start', '-D', datadir, '-o',
199                               '-c unix_socket_directories=%s -c port=%s -c \
200                                listen_addresses=%s' % (rundir, port, addr),
201                               '-l', log, '-w'],
202                              env=env, preexec_fn=os.setsid)
203         self.processes.append(p)
204         p.wait()
205         for d in ['adminconfig', 'users', 'transactions', 'sessions']:
206             cmd = ['/usr/bin/createdb', '-h', addr, '-p', port, d]
207             subprocess.check_call(cmd, env=env)
208
209     def setup_ldap(self, env):
210         ldapdir = os.path.join(self.testdir, 'ldap')
211         os.mkdir(ldapdir)
212         with open(os.path.join(self.rootdir, 'tests/slapd.conf')) as f:
213             t = Template(f.read())
214             text = t.substitute({'ldapdir': ldapdir})
215         filename = os.path.join(ldapdir, 'slapd.conf')
216         with open(filename, 'w+') as f:
217             f.write(text)
218         subprocess.check_call(['/usr/sbin/slapadd', '-f', filename, '-l',
219                                'tests/ldapdata.ldif'], env=env)
220
221         return filename
222
223     def start_ldap_server(self, conf, addr, port, env):
224         p = subprocess.Popen(['/usr/sbin/slapd', '-d', '0', '-f', conf,
225                              '-h', 'ldap://%s:%s' % (addr, port)],
226                              env=env, preexec_fn=os.setsid)
227         self.processes.append(p)
228
229     def setup_kdc(self, env):
230
231         # setup kerberos environment
232         testlog = os.path.join(self.testdir, 'kerb.log')
233         krb5conf = os.path.join(self.testdir, 'krb5.conf')
234         kdcconf = os.path.join(self.testdir, 'kdc.conf')
235         kdcdir = os.path.join(self.testdir, 'kdc')
236         if os.path.exists(kdcdir):
237             shutil.rmtree(kdcdir)
238         os.makedirs(kdcdir)
239
240         t = Template(KRB5_CONF_TEMPLATE)
241         text = t.substitute({'TESTREALM': TESTREALM,
242                              'TESTDOMAIN': TESTDOMAIN,
243                              'TESTDIR': self.testdir,
244                              'KDCDIR': kdcdir,
245                              'KDC_DBNAME': KDC_DBNAME,
246                              'WRAP_HOSTNAME': WRAP_HOSTNAME})
247         with open(krb5conf, 'w+') as f:
248             f.write(text)
249
250         t = Template(KDC_CONF_TEMPLATE)
251         text = t.substitute({'TESTREALM': TESTREALM,
252                              'KDCDIR': kdcdir,
253                              'KDCLOG': testlog,
254                              'KDC_STASH': KDC_STASH})
255         with open(kdcconf, 'w+') as f:
256             f.write(text)
257
258         kdcenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin',
259                   'KRB5_CONFIG': krb5conf,
260                   'KRB5_KDC_PROFILE': kdcconf}
261         kdcenv.update(env)
262
263         with (open(testlog, 'a')) as logfile:
264             ksetup = subprocess.Popen(["kdb5_util", "create", "-s",
265                                        "-r", TESTREALM, "-P", KDC_PASSWORD],
266                                       stdout=logfile, stderr=logfile,
267                                       env=kdcenv, preexec_fn=os.setsid)
268         ksetup.wait()
269         if ksetup.returncode != 0:
270             raise ValueError('KDC Setup failed')
271
272         kdcproc = subprocess.Popen(['krb5kdc', '-n'],
273                                    env=kdcenv, preexec_fn=os.setsid)
274         self.processes.append(kdcproc)
275
276         return kdcenv
277
278     def kadmin_local(self, cmd, env, logfile):
279         ksetup = subprocess.Popen(["kadmin.local", "-q", cmd],
280                                   stdout=logfile, stderr=logfile,
281                                   env=env, preexec_fn=os.setsid)
282         ksetup.wait()
283         if ksetup.returncode != 0:
284             raise ValueError('Kadmin local [%s] failed' % cmd)
285
286     def setup_keys(self, env):
287
288         testlog = os.path.join(self.testdir, 'kerb.log')
289
290         svc_name = "HTTP/%s" % WRAP_HOSTNAME
291         svc_keytab = os.path.join(self.testdir, HTTP_KTNAME)
292         cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, svc_name)
293         with (open(testlog, 'a')) as logfile:
294             self.kadmin_local(cmd, env, logfile)
295         cmd = "ktadd -k %s -e %s %s" % (svc_keytab, KEY_TYPE, svc_name)
296         with (open(testlog, 'a')) as logfile:
297             self.kadmin_local(cmd, env, logfile)
298
299         usr_keytab = os.path.join(self.testdir, USER_KTNAME)
300         cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, self.testuser)
301         with (open(testlog, 'a')) as logfile:
302             self.kadmin_local(cmd, env, logfile)
303         cmd = "ktadd -k %s -e %s %s" % (usr_keytab, KEY_TYPE, self.testuser)
304         with (open(testlog, 'a')) as logfile:
305             self.kadmin_local(cmd, env, logfile)
306
307         keys_env = {"KRB5_KTNAME": svc_keytab}
308         keys_env.update(env)
309
310         return keys_env
311
312     def kinit_keytab(self, kdcenv):
313         testlog = os.path.join(self.testdir, 'kinit.log')
314         usr_keytab = os.path.join(self.testdir, USER_KTNAME)
315         kdcenv['KRB5CCNAME'] = 'FILE:' + os.path.join(
316             self.testdir, 'ccaches/user')
317         with (open(testlog, 'a')) as logfile:
318             logfile.write("\n%s\n" % kdcenv)
319             ksetup = subprocess.Popen(["kinit", "-kt", usr_keytab,
320                                        self.testuser],
321                                       stdout=logfile, stderr=logfile,
322                                       env=kdcenv, preexec_fn=os.setsid)
323             ksetup.wait()
324             if ksetup.returncode != 0:
325                 raise ValueError('kinit %s failed' % self.testuser)
326
327     def wait(self):
328         for p in self.processes:
329             os.killpg(p.pid, signal.SIGTERM)
330
331     def setup_servers(self, env=None):
332         raise NotImplementedError()
333
334     def run(self, env):
335         exe = self.execname
336         if exe.endswith('c'):
337             exe = exe[:-1]
338         return subprocess.call([exe], env=env)