3 # Copyright (C) 2014 Simo Sorce <simo@redhat.com>
5 # see file 'COPYING' for use and warranty information
7 # This program is free software; you can redistribute it and/or modify
8 # it under the terms of the GNU General Public License as published by
9 # the Free Software Foundation, either version 3 of the License, or
10 # (at your option) any later version.
12 # This program is distributed in the hope that it will be useful,
13 # but WITHOUT ANY WARRANTY; without even the implied warranty of
14 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 # GNU General Public License for more details.
17 # You should have received a copy of the GNU General Public License
18 # along with this program. If not, see <http://www.gnu.org/licenses/>.
28 from string import Template
32 WRAP_HOSTNAME = 'idp.ipsilon.dev'
33 TESTREALM = 'IPSILON.DEV'
34 TESTDOMAIN = 'ipsilon.dev'
35 KDC_DBNAME = 'db.file'
36 KDC_STASH = 'stash.file'
37 KDC_PASSWORD = 'ipsilon'
38 KRB5_CONF_TEMPLATE = '''
40 default_realm = ${TESTREALM}
41 dns_lookup_realm = false
42 dns_lookup_kdc = false
46 default_ccache_name = FILE://${TESTDIR}/ccaches/krb5_ccache_XXXXXX
47 udp_preference_limit = 0
55 .${TESTDOMAIN} = ${TESTREALM}
56 ${TESTDOMAIN} = ${TESTREALM}
60 database_name = ${KDCDIR}/${KDC_DBNAME}
64 KDC_CONF_TEMPLATE = '''
68 restrict_anonymous_to_tgt = true
72 master_key_type = aes256-cts
74 max_renewable_life = 14d
75 acl_file = ${KDCDIR}/kadm5.acl
76 dict_file = /usr/share/dict/words
77 default_principal_flags = +preauth
78 admin_keytab = ${TESTREALM}/kadm5.keytab
79 key_stash_file = ${KDCDIR}/${KDC_STASH}
85 USER_KTNAME = "user.keytab"
86 HTTP_KTNAME = "http.keytab"
87 KEY_TYPE = "aes256-cts-hmac-sha1-96:normal"
90 class IpsilonTestBase(object):
92 def __init__(self, name, execname):
94 self.execname = execname
95 self.rootdir = os.getcwd()
97 self.testuser = pwd.getpwuid(os.getuid())[0]
100 def force_remove(self, op, name, info):
104 def setup_base(self, path, test):
105 self.testdir = os.path.join(path, test.name)
106 if os.path.exists(self.testdir):
107 shutil.rmtree(self.testdir, onerror=self.force_remove)
108 os.makedirs(self.testdir)
109 shutil.copytree(os.path.join(self.rootdir, 'templates'),
110 os.path.join(self.testdir, 'templates'))
111 os.mkdir(os.path.join(self.testdir, 'etc'))
112 os.mkdir(os.path.join(self.testdir, 'lib'))
113 os.mkdir(os.path.join(self.testdir, 'lib', test.name))
114 os.mkdir(os.path.join(self.testdir, 'log'))
116 def generate_profile(self, global_opts, args_opts, name, addr, port,
117 nameid='unspecified'):
118 newconf = ConfigParser.ConfigParser()
119 newconf.add_section('globals')
120 for k in global_opts.keys():
121 newconf.set('globals', k, global_opts[k])
122 newconf.add_section('arguments')
123 for k in args_opts.keys():
124 newconf.set('arguments', k, args_opts[k])
126 profile = io.BytesIO()
127 newconf.write(profile)
129 t = Template(profile.getvalue())
130 text = t.substitute({'NAME': name, 'ADDRESS': addr, 'PORT': port,
131 'TESTDIR': self.testdir,
132 'ROOTDIR': self.rootdir,
134 'HTTP_KTNAME': HTTP_KTNAME,
135 'TEST_USER': self.testuser})
137 filename = os.path.join(self.testdir, '%s_profile.cfg' % name)
138 with open(filename, 'wb') as f:
143 def setup_http(self, name, addr, port):
144 httpdir = os.path.join(self.testdir, name)
146 os.mkdir(os.path.join(httpdir, 'conf.d'))
147 os.mkdir(os.path.join(httpdir, 'html'))
148 os.mkdir(os.path.join(httpdir, 'logs'))
149 os.symlink('/etc/httpd/modules', os.path.join(httpdir, 'modules'))
151 with open(os.path.join(self.rootdir, 'tests/httpd.conf')) as f:
152 t = Template(f.read())
153 text = t.substitute({'HTTPROOT': httpdir,
156 filename = os.path.join(httpdir, 'httpd.conf')
157 with open(filename, 'w+') as f:
162 def setup_idp_server(self, profile, name, addr, port, env):
163 http_conf_file = self.setup_http(name, addr, port)
164 cmd = [os.path.join(self.rootdir,
165 'ipsilon/install/ipsilon-server-install'),
166 '--config-profile=%s' % profile]
167 subprocess.check_call(cmd, env=env)
168 os.symlink(os.path.join(self.rootdir, 'ipsilon'),
169 os.path.join(self.testdir, 'lib', name, 'ipsilon'))
171 return http_conf_file
173 def setup_sp_server(self, profile, name, addr, port, env):
174 http_conf_file = self.setup_http(name, addr, port)
175 cmd = [os.path.join(self.rootdir,
176 'ipsilon/install/ipsilon-client-install'),
177 '--config-profile=%s' % profile]
178 subprocess.check_call(cmd, env=env)
180 return http_conf_file
182 def setup_pgdb(self, datadir, env):
183 cmd = ['/usr/bin/pg_ctl', 'initdb', '-D', datadir]
184 subprocess.check_call(cmd, env=env)
185 auth = 'host all all 127.0.0.1/24 trust\n'
186 filename = os.path.join(datadir, 'pg_hba.conf')
187 with open(filename, 'a') as f:
190 def start_http_server(self, conf, env):
191 env['MALLOC_CHECK_'] = '3'
192 env['MALLOC_PERTURB_'] = str(random.randint(0, 32767) % 255 + 1)
193 p = subprocess.Popen(['/usr/sbin/httpd', '-DFOREGROUND', '-f', conf],
194 env=env, preexec_fn=os.setsid)
195 self.processes.append(p)
197 def start_pgdb_server(self, datadir, rundir, log, addr, port, env):
198 p = subprocess.Popen(['/usr/bin/pg_ctl', 'start', '-D', datadir, '-o',
199 '-c unix_socket_directories=%s -c port=%s -c \
200 listen_addresses=%s' % (rundir, port, addr),
202 env=env, preexec_fn=os.setsid)
203 self.processes.append(p)
205 for d in ['adminconfig', 'users', 'transactions', 'sessions']:
206 cmd = ['/usr/bin/createdb', '-h', addr, '-p', port, d]
207 subprocess.check_call(cmd, env=env)
209 def setup_ldap(self, env):
210 ldapdir = os.path.join(self.testdir, 'ldap')
212 with open(os.path.join(self.rootdir, 'tests/slapd.conf')) as f:
213 t = Template(f.read())
214 text = t.substitute({'ldapdir': ldapdir})
215 filename = os.path.join(ldapdir, 'slapd.conf')
216 with open(filename, 'w+') as f:
218 subprocess.check_call(['/usr/sbin/slapadd', '-f', filename, '-l',
219 'tests/ldapdata.ldif'], env=env)
223 def start_ldap_server(self, conf, addr, port, env):
224 p = subprocess.Popen(['/usr/sbin/slapd', '-d', '0', '-f', conf,
225 '-h', 'ldap://%s:%s' % (addr, port)],
226 env=env, preexec_fn=os.setsid)
227 self.processes.append(p)
229 def setup_kdc(self, env):
231 # setup kerberos environment
232 testlog = os.path.join(self.testdir, 'kerb.log')
233 krb5conf = os.path.join(self.testdir, 'krb5.conf')
234 kdcconf = os.path.join(self.testdir, 'kdc.conf')
235 kdcdir = os.path.join(self.testdir, 'kdc')
236 if os.path.exists(kdcdir):
237 shutil.rmtree(kdcdir)
240 t = Template(KRB5_CONF_TEMPLATE)
241 text = t.substitute({'TESTREALM': TESTREALM,
242 'TESTDOMAIN': TESTDOMAIN,
243 'TESTDIR': self.testdir,
245 'KDC_DBNAME': KDC_DBNAME,
246 'WRAP_HOSTNAME': WRAP_HOSTNAME})
247 with open(krb5conf, 'w+') as f:
250 t = Template(KDC_CONF_TEMPLATE)
251 text = t.substitute({'TESTREALM': TESTREALM,
254 'KDC_STASH': KDC_STASH})
255 with open(kdcconf, 'w+') as f:
258 kdcenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin',
259 'KRB5_CONFIG': krb5conf,
260 'KRB5_KDC_PROFILE': kdcconf}
263 with (open(testlog, 'a')) as logfile:
264 ksetup = subprocess.Popen(["kdb5_util", "create", "-s",
265 "-r", TESTREALM, "-P", KDC_PASSWORD],
266 stdout=logfile, stderr=logfile,
267 env=kdcenv, preexec_fn=os.setsid)
269 if ksetup.returncode != 0:
270 raise ValueError('KDC Setup failed')
272 kdcproc = subprocess.Popen(['krb5kdc', '-n'],
273 env=kdcenv, preexec_fn=os.setsid)
274 self.processes.append(kdcproc)
278 def kadmin_local(self, cmd, env, logfile):
279 ksetup = subprocess.Popen(["kadmin.local", "-q", cmd],
280 stdout=logfile, stderr=logfile,
281 env=env, preexec_fn=os.setsid)
283 if ksetup.returncode != 0:
284 raise ValueError('Kadmin local [%s] failed' % cmd)
286 def setup_keys(self, env):
288 testlog = os.path.join(self.testdir, 'kerb.log')
290 svc_name = "HTTP/%s" % WRAP_HOSTNAME
291 svc_keytab = os.path.join(self.testdir, HTTP_KTNAME)
292 cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, svc_name)
293 with (open(testlog, 'a')) as logfile:
294 self.kadmin_local(cmd, env, logfile)
295 cmd = "ktadd -k %s -e %s %s" % (svc_keytab, KEY_TYPE, svc_name)
296 with (open(testlog, 'a')) as logfile:
297 self.kadmin_local(cmd, env, logfile)
299 usr_keytab = os.path.join(self.testdir, USER_KTNAME)
300 cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, self.testuser)
301 with (open(testlog, 'a')) as logfile:
302 self.kadmin_local(cmd, env, logfile)
303 cmd = "ktadd -k %s -e %s %s" % (usr_keytab, KEY_TYPE, self.testuser)
304 with (open(testlog, 'a')) as logfile:
305 self.kadmin_local(cmd, env, logfile)
307 keys_env = {"KRB5_KTNAME": svc_keytab}
312 def kinit_keytab(self, kdcenv):
313 testlog = os.path.join(self.testdir, 'kinit.log')
314 usr_keytab = os.path.join(self.testdir, USER_KTNAME)
315 kdcenv['KRB5CCNAME'] = 'FILE:' + os.path.join(
316 self.testdir, 'ccaches/user')
317 with (open(testlog, 'a')) as logfile:
318 logfile.write("\n%s\n" % kdcenv)
319 ksetup = subprocess.Popen(["kinit", "-kt", usr_keytab,
321 stdout=logfile, stderr=logfile,
322 env=kdcenv, preexec_fn=os.setsid)
324 if ksetup.returncode != 0:
325 raise ValueError('kinit %s failed' % self.testuser)
328 for p in self.processes:
329 os.killpg(p.pid, signal.SIGTERM)
331 def setup_servers(self, env=None):
332 raise NotImplementedError()
336 if exe.endswith('c'):
338 return subprocess.call([exe], env=env)