3 # Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING
12 from string import Template
16 WRAP_HOSTNAME = 'idp.ipsilon.dev'
17 TESTREALM = 'IPSILON.DEV'
18 TESTDOMAIN = 'ipsilon.dev'
19 KDC_DBNAME = 'db.file'
20 KDC_STASH = 'stash.file'
21 KDC_PASSWORD = 'ipsilon'
22 KRB5_CONF_TEMPLATE = '''
24 default_realm = ${TESTREALM}
25 dns_lookup_realm = false
26 dns_lookup_kdc = false
30 default_ccache_name = FILE://${TESTDIR}/ccaches/krb5_ccache_XXXXXX
31 udp_preference_limit = 0
39 .${TESTDOMAIN} = ${TESTREALM}
40 ${TESTDOMAIN} = ${TESTREALM}
44 database_name = ${KDCDIR}/${KDC_DBNAME}
48 KDC_CONF_TEMPLATE = '''
52 restrict_anonymous_to_tgt = true
56 master_key_type = aes256-cts
58 max_renewable_life = 14d
59 acl_file = ${KDCDIR}/kadm5.acl
60 dict_file = /usr/share/dict/words
61 default_principal_flags = +preauth
62 admin_keytab = ${TESTREALM}/kadm5.keytab
63 key_stash_file = ${KDCDIR}/${KDC_STASH}
69 USER_KTNAME = "user.keytab"
70 HTTP_KTNAME = "http.keytab"
71 KEY_TYPE = "aes256-cts-hmac-sha1-96:normal"
74 class IpsilonTestBase(object):
76 def __init__(self, name, execname):
78 self.execname = execname
79 self.rootdir = os.getcwd()
81 self.testuser = pwd.getpwuid(os.getuid())[0]
84 def force_remove(self, op, name, info):
88 def setup_base(self, path, test):
89 self.testdir = os.path.join(path, test.name)
90 if os.path.exists(self.testdir):
91 shutil.rmtree(self.testdir, onerror=self.force_remove)
92 os.makedirs(self.testdir)
93 shutil.copytree(os.path.join(self.rootdir, 'templates'),
94 os.path.join(self.testdir, 'templates'))
95 os.mkdir(os.path.join(self.testdir, 'etc'))
96 os.mkdir(os.path.join(self.testdir, 'lib'))
97 os.mkdir(os.path.join(self.testdir, 'lib', test.name))
98 os.mkdir(os.path.join(self.testdir, 'log'))
99 os.mkdir(os.path.join(self.testdir, 'cache'))
101 def generate_profile(self, global_opts, args_opts, name, addr, port,
102 nameid='unspecified'):
103 newconf = ConfigParser.ConfigParser()
104 newconf.add_section('globals')
105 for k in global_opts:
106 newconf.set('globals', k, global_opts[k])
107 newconf.add_section('arguments')
109 newconf.set('arguments', k, args_opts[k])
111 profile = io.BytesIO()
112 newconf.write(profile)
114 t = Template(profile.getvalue())
115 text = t.substitute({'NAME': name, 'ADDRESS': addr, 'PORT': port,
116 'TESTDIR': self.testdir,
117 'ROOTDIR': self.rootdir,
119 'HTTP_KTNAME': HTTP_KTNAME,
120 'TEST_USER': self.testuser})
122 filename = os.path.join(self.testdir, '%s_profile.cfg' % name)
123 with open(filename, 'wb') as f:
128 def setup_http(self, name, addr, port):
129 httpdir = os.path.join(self.testdir, name)
131 os.mkdir(os.path.join(httpdir, 'conf.d'))
132 os.mkdir(os.path.join(httpdir, 'html'))
133 os.mkdir(os.path.join(httpdir, 'logs'))
134 os.symlink('/etc/httpd/modules', os.path.join(httpdir, 'modules'))
136 with open(os.path.join(self.rootdir, 'tests/httpd.conf')) as f:
137 t = Template(f.read())
138 text = t.substitute({'HTTPROOT': httpdir,
141 filename = os.path.join(httpdir, 'httpd.conf')
142 with open(filename, 'w+') as f:
147 def setup_idp_server(self, profile, name, addr, port, env):
148 http_conf_file = self.setup_http(name, addr, port)
149 cmd = [os.path.join(self.rootdir,
150 'ipsilon/install/ipsilon-server-install'),
151 '--config-profile=%s' % profile]
152 subprocess.check_call(cmd, env=env)
153 os.symlink(os.path.join(self.rootdir, 'ipsilon'),
154 os.path.join(self.testdir, 'lib', name, 'ipsilon'))
156 return http_conf_file
158 def setup_sp_server(self, profile, name, addr, port, env):
159 http_conf_file = self.setup_http(name, addr, port)
160 cmd = [os.path.join(self.rootdir,
161 'ipsilon/install/ipsilon-client-install'),
162 '--config-profile=%s' % profile]
163 subprocess.check_call(cmd, env=env)
165 return http_conf_file
167 def setup_pgdb(self, datadir, env):
168 cmd = ['/usr/bin/pg_ctl', 'initdb', '-D', datadir]
169 subprocess.check_call(cmd, env=env)
170 auth = 'host all all 127.0.0.1/24 trust\n'
171 filename = os.path.join(datadir, 'pg_hba.conf')
172 with open(filename, 'a') as f:
175 def start_http_server(self, conf, env):
176 env['MALLOC_CHECK_'] = '3'
177 env['MALLOC_PERTURB_'] = str(random.randint(0, 32767) % 255 + 1)
178 p = subprocess.Popen(['/usr/sbin/httpd', '-DFOREGROUND', '-f', conf],
179 env=env, preexec_fn=os.setsid)
180 self.processes.append(p)
183 def start_pgdb_server(self, datadir, rundir, log, addr, port, env):
184 p = subprocess.Popen(['/usr/bin/pg_ctl', 'start', '-D', datadir, '-o',
185 '-c unix_socket_directories=%s -c port=%s -c \
186 listen_addresses=%s' % (rundir, port, addr),
188 env=env, preexec_fn=os.setsid)
189 self.processes.append(p)
191 for d in ['adminconfig', 'users', 'transactions', 'sessions',
192 'saml2.sessions.db']:
193 cmd = ['/usr/bin/createdb', '-h', addr, '-p', port, d]
194 subprocess.check_call(cmd, env=env)
196 def setup_ldap(self, env):
197 ldapdir = os.path.join(self.testdir, 'ldap')
199 with open(os.path.join(self.rootdir, 'tests/slapd.conf')) as f:
200 t = Template(f.read())
201 text = t.substitute({'ldapdir': ldapdir})
202 filename = os.path.join(ldapdir, 'slapd.conf')
203 with open(filename, 'w+') as f:
205 subprocess.check_call(['/usr/sbin/slapadd', '-f', filename, '-l',
206 'tests/ldapdata.ldif'], env=env)
210 def start_ldap_server(self, conf, addr, port, env):
211 p = subprocess.Popen(['/usr/sbin/slapd', '-d', '0', '-f', conf,
212 '-h', 'ldap://%s:%s' % (addr, port)],
213 env=env, preexec_fn=os.setsid)
214 self.processes.append(p)
216 def setup_kdc(self, env):
218 # setup kerberos environment
219 testlog = os.path.join(self.testdir, 'kerb.log')
220 krb5conf = os.path.join(self.testdir, 'krb5.conf')
221 kdcconf = os.path.join(self.testdir, 'kdc.conf')
222 kdcdir = os.path.join(self.testdir, 'kdc')
223 if os.path.exists(kdcdir):
224 shutil.rmtree(kdcdir)
227 t = Template(KRB5_CONF_TEMPLATE)
228 text = t.substitute({'TESTREALM': TESTREALM,
229 'TESTDOMAIN': TESTDOMAIN,
230 'TESTDIR': self.testdir,
232 'KDC_DBNAME': KDC_DBNAME,
233 'WRAP_HOSTNAME': WRAP_HOSTNAME})
234 with open(krb5conf, 'w+') as f:
237 t = Template(KDC_CONF_TEMPLATE)
238 text = t.substitute({'TESTREALM': TESTREALM,
241 'KDC_STASH': KDC_STASH})
242 with open(kdcconf, 'w+') as f:
245 kdcenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin',
246 'KRB5_CONFIG': krb5conf,
247 'KRB5_KDC_PROFILE': kdcconf}
250 with (open(testlog, 'a')) as logfile:
251 ksetup = subprocess.Popen(["kdb5_util", "create", "-s",
252 "-r", TESTREALM, "-P", KDC_PASSWORD],
253 stdout=logfile, stderr=logfile,
254 env=kdcenv, preexec_fn=os.setsid)
256 if ksetup.returncode != 0:
257 raise ValueError('KDC Setup failed')
259 kdcproc = subprocess.Popen(['krb5kdc', '-n'],
260 env=kdcenv, preexec_fn=os.setsid)
261 self.processes.append(kdcproc)
265 def kadmin_local(self, cmd, env, logfile):
266 ksetup = subprocess.Popen(["kadmin.local", "-q", cmd],
267 stdout=logfile, stderr=logfile,
268 env=env, preexec_fn=os.setsid)
270 if ksetup.returncode != 0:
271 raise ValueError('Kadmin local [%s] failed' % cmd)
273 def setup_keys(self, env):
275 testlog = os.path.join(self.testdir, 'kerb.log')
277 svc_name = "HTTP/%s" % WRAP_HOSTNAME
278 svc_keytab = os.path.join(self.testdir, HTTP_KTNAME)
279 cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, svc_name)
280 with (open(testlog, 'a')) as logfile:
281 self.kadmin_local(cmd, env, logfile)
282 cmd = "ktadd -k %s -e %s %s" % (svc_keytab, KEY_TYPE, svc_name)
283 with (open(testlog, 'a')) as logfile:
284 self.kadmin_local(cmd, env, logfile)
286 usr_keytab = os.path.join(self.testdir, USER_KTNAME)
287 cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, self.testuser)
288 with (open(testlog, 'a')) as logfile:
289 self.kadmin_local(cmd, env, logfile)
290 cmd = "ktadd -k %s -e %s %s" % (usr_keytab, KEY_TYPE, self.testuser)
291 with (open(testlog, 'a')) as logfile:
292 self.kadmin_local(cmd, env, logfile)
294 keys_env = {"KRB5_KTNAME": svc_keytab}
299 def kinit_keytab(self, kdcenv):
300 testlog = os.path.join(self.testdir, 'kinit.log')
301 usr_keytab = os.path.join(self.testdir, USER_KTNAME)
302 kdcenv['KRB5CCNAME'] = 'FILE:' + os.path.join(
303 self.testdir, 'ccaches/user')
304 with (open(testlog, 'a')) as logfile:
305 logfile.write("\n%s\n" % kdcenv)
306 ksetup = subprocess.Popen(["kinit", "-kt", usr_keytab,
308 stdout=logfile, stderr=logfile,
309 env=kdcenv, preexec_fn=os.setsid)
311 if ksetup.returncode != 0:
312 raise ValueError('kinit %s failed' % self.testuser)
315 for p in self.processes:
316 os.killpg(p.pid, signal.SIGTERM)
318 def setup_servers(self, env=None):
319 raise NotImplementedError()
323 if exe.endswith('c'):
325 return subprocess.call([exe], env=env)