Configure the SAML2 session database during installation
[cascardo/ipsilon.git] / tests / helpers / common.py
index d020a3c..93d0f17 100755 (executable)
@@ -1,38 +1,79 @@
 #!/usr/bin/python
 #
-# Copyright (C) 2014  Simo Sorce <simo@redhat.com>
-#
-# see file 'COPYING' for use and warranty information
-#
-# This program is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program.  If not, see <http://www.gnu.org/licenses/>.
-
+# Copyright (C) 2014 Ipsilon project Contributors, for license see COPYING
 
 import ConfigParser
 import io
-from ipsilon.util.plugin import PluginObject
 import os
 import pwd
 import shutil
 import signal
+import random
 from string import Template
 import subprocess
 
 
-class IpsilonTestBase(PluginObject):
+WRAP_HOSTNAME = 'idp.ipsilon.dev'
+TESTREALM = 'IPSILON.DEV'
+TESTDOMAIN = 'ipsilon.dev'
+KDC_DBNAME = 'db.file'
+KDC_STASH = 'stash.file'
+KDC_PASSWORD = 'ipsilon'
+KRB5_CONF_TEMPLATE = '''
+[libdefaults]
+  default_realm = ${TESTREALM}
+  dns_lookup_realm = false
+  dns_lookup_kdc = false
+  rdns = false
+  ticket_lifetime = 24h
+  forwardable = yes
+  default_ccache_name = FILE://${TESTDIR}/ccaches/krb5_ccache_XXXXXX
+  udp_preference_limit = 0
+
+[realms]
+  ${TESTREALM} = {
+    kdc =${WRAP_HOSTNAME}
+  }
+
+[domain_realm]
+  .${TESTDOMAIN} = ${TESTREALM}
+  ${TESTDOMAIN} = ${TESTREALM}
+
+[dbmodules]
+  ${TESTREALM} = {
+    database_name = ${KDCDIR}/${KDC_DBNAME}
+  }
+'''
+
+KDC_CONF_TEMPLATE = '''
+[kdcdefaults]
+ kdc_ports = 88
+ kdc_tcp_ports = 88
+ restrict_anonymous_to_tgt = true
+
+[realms]
+ ${TESTREALM} = {
+  master_key_type = aes256-cts
+  max_life = 7d
+  max_renewable_life = 14d
+  acl_file = ${KDCDIR}/kadm5.acl
+  dict_file = /usr/share/dict/words
+  default_principal_flags = +preauth
+  admin_keytab = ${TESTREALM}/kadm5.keytab
+  key_stash_file = ${KDCDIR}/${KDC_STASH}
+ }
+[logging]
+  kdc = FILE:${KDCLOG}
+'''
+
+USER_KTNAME = "user.keytab"
+HTTP_KTNAME = "http.keytab"
+KEY_TYPE = "aes256-cts-hmac-sha1-96:normal"
+
+
+class IpsilonTestBase(object):
 
     def __init__(self, name, execname):
-        super(IpsilonTestBase, self).__init__()
         self.name = name
         self.execname = execname
         self.rootdir = os.getcwd()
@@ -56,7 +97,8 @@ class IpsilonTestBase(PluginObject):
         os.mkdir(os.path.join(self.testdir, 'lib', test.name))
         os.mkdir(os.path.join(self.testdir, 'log'))
 
-    def generate_profile(self, global_opts, args_opts, name, addr, port):
+    def generate_profile(self, global_opts, args_opts, name, addr, port,
+                         nameid='unspecified'):
         newconf = ConfigParser.ConfigParser()
         newconf.add_section('globals')
         for k in global_opts.keys():
@@ -72,6 +114,8 @@ class IpsilonTestBase(PluginObject):
         text = t.substitute({'NAME': name, 'ADDRESS': addr, 'PORT': port,
                              'TESTDIR': self.testdir,
                              'ROOTDIR': self.rootdir,
+                             'NAMEID': nameid,
+                             'HTTP_KTNAME': HTTP_KTNAME,
                              'TEST_USER': self.testuser})
 
         filename = os.path.join(self.testdir, '%s_profile.cfg' % name)
@@ -119,11 +163,152 @@ class IpsilonTestBase(PluginObject):
 
         return http_conf_file
 
+    def setup_pgdb(self, datadir, env):
+        cmd = ['/usr/bin/pg_ctl', 'initdb', '-D', datadir]
+        subprocess.check_call(cmd, env=env)
+        auth = 'host all all 127.0.0.1/24 trust\n'
+        filename = os.path.join(datadir, 'pg_hba.conf')
+        with open(filename, 'a') as f:
+            f.write(auth)
+
     def start_http_server(self, conf, env):
+        env['MALLOC_CHECK_'] = '3'
+        env['MALLOC_PERTURB_'] = str(random.randint(0, 32767) % 255 + 1)
         p = subprocess.Popen(['/usr/sbin/httpd', '-DFOREGROUND', '-f', conf],
                              env=env, preexec_fn=os.setsid)
         self.processes.append(p)
 
+    def start_pgdb_server(self, datadir, rundir, log, addr, port, env):
+        p = subprocess.Popen(['/usr/bin/pg_ctl', 'start', '-D', datadir, '-o',
+                              '-c unix_socket_directories=%s -c port=%s -c \
+                               listen_addresses=%s' % (rundir, port, addr),
+                              '-l', log, '-w'],
+                             env=env, preexec_fn=os.setsid)
+        self.processes.append(p)
+        p.wait()
+        for d in ['adminconfig', 'users', 'transactions', 'sessions',
+                  'saml2sessions']:
+            cmd = ['/usr/bin/createdb', '-h', addr, '-p', port, d]
+            subprocess.check_call(cmd, env=env)
+
+    def setup_ldap(self, env):
+        ldapdir = os.path.join(self.testdir, 'ldap')
+        os.mkdir(ldapdir)
+        with open(os.path.join(self.rootdir, 'tests/slapd.conf')) as f:
+            t = Template(f.read())
+            text = t.substitute({'ldapdir': ldapdir})
+        filename = os.path.join(ldapdir, 'slapd.conf')
+        with open(filename, 'w+') as f:
+            f.write(text)
+        subprocess.check_call(['/usr/sbin/slapadd', '-f', filename, '-l',
+                               'tests/ldapdata.ldif'], env=env)
+
+        return filename
+
+    def start_ldap_server(self, conf, addr, port, env):
+        p = subprocess.Popen(['/usr/sbin/slapd', '-d', '0', '-f', conf,
+                             '-h', 'ldap://%s:%s' % (addr, port)],
+                             env=env, preexec_fn=os.setsid)
+        self.processes.append(p)
+
+    def setup_kdc(self, env):
+
+        # setup kerberos environment
+        testlog = os.path.join(self.testdir, 'kerb.log')
+        krb5conf = os.path.join(self.testdir, 'krb5.conf')
+        kdcconf = os.path.join(self.testdir, 'kdc.conf')
+        kdcdir = os.path.join(self.testdir, 'kdc')
+        if os.path.exists(kdcdir):
+            shutil.rmtree(kdcdir)
+        os.makedirs(kdcdir)
+
+        t = Template(KRB5_CONF_TEMPLATE)
+        text = t.substitute({'TESTREALM': TESTREALM,
+                             'TESTDOMAIN': TESTDOMAIN,
+                             'TESTDIR': self.testdir,
+                             'KDCDIR': kdcdir,
+                             'KDC_DBNAME': KDC_DBNAME,
+                             'WRAP_HOSTNAME': WRAP_HOSTNAME})
+        with open(krb5conf, 'w+') as f:
+            f.write(text)
+
+        t = Template(KDC_CONF_TEMPLATE)
+        text = t.substitute({'TESTREALM': TESTREALM,
+                             'KDCDIR': kdcdir,
+                             'KDCLOG': testlog,
+                             'KDC_STASH': KDC_STASH})
+        with open(kdcconf, 'w+') as f:
+            f.write(text)
+
+        kdcenv = {'PATH': '/sbin:/bin:/usr/sbin:/usr/bin',
+                  'KRB5_CONFIG': krb5conf,
+                  'KRB5_KDC_PROFILE': kdcconf}
+        kdcenv.update(env)
+
+        with (open(testlog, 'a')) as logfile:
+            ksetup = subprocess.Popen(["kdb5_util", "create", "-s",
+                                       "-r", TESTREALM, "-P", KDC_PASSWORD],
+                                      stdout=logfile, stderr=logfile,
+                                      env=kdcenv, preexec_fn=os.setsid)
+        ksetup.wait()
+        if ksetup.returncode != 0:
+            raise ValueError('KDC Setup failed')
+
+        kdcproc = subprocess.Popen(['krb5kdc', '-n'],
+                                   env=kdcenv, preexec_fn=os.setsid)
+        self.processes.append(kdcproc)
+
+        return kdcenv
+
+    def kadmin_local(self, cmd, env, logfile):
+        ksetup = subprocess.Popen(["kadmin.local", "-q", cmd],
+                                  stdout=logfile, stderr=logfile,
+                                  env=env, preexec_fn=os.setsid)
+        ksetup.wait()
+        if ksetup.returncode != 0:
+            raise ValueError('Kadmin local [%s] failed' % cmd)
+
+    def setup_keys(self, env):
+
+        testlog = os.path.join(self.testdir, 'kerb.log')
+
+        svc_name = "HTTP/%s" % WRAP_HOSTNAME
+        svc_keytab = os.path.join(self.testdir, HTTP_KTNAME)
+        cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, svc_name)
+        with (open(testlog, 'a')) as logfile:
+            self.kadmin_local(cmd, env, logfile)
+        cmd = "ktadd -k %s -e %s %s" % (svc_keytab, KEY_TYPE, svc_name)
+        with (open(testlog, 'a')) as logfile:
+            self.kadmin_local(cmd, env, logfile)
+
+        usr_keytab = os.path.join(self.testdir, USER_KTNAME)
+        cmd = "addprinc -randkey -e %s %s" % (KEY_TYPE, self.testuser)
+        with (open(testlog, 'a')) as logfile:
+            self.kadmin_local(cmd, env, logfile)
+        cmd = "ktadd -k %s -e %s %s" % (usr_keytab, KEY_TYPE, self.testuser)
+        with (open(testlog, 'a')) as logfile:
+            self.kadmin_local(cmd, env, logfile)
+
+        keys_env = {"KRB5_KTNAME": svc_keytab}
+        keys_env.update(env)
+
+        return keys_env
+
+    def kinit_keytab(self, kdcenv):
+        testlog = os.path.join(self.testdir, 'kinit.log')
+        usr_keytab = os.path.join(self.testdir, USER_KTNAME)
+        kdcenv['KRB5CCNAME'] = 'FILE:' + os.path.join(
+            self.testdir, 'ccaches/user')
+        with (open(testlog, 'a')) as logfile:
+            logfile.write("\n%s\n" % kdcenv)
+            ksetup = subprocess.Popen(["kinit", "-kt", usr_keytab,
+                                       self.testuser],
+                                      stdout=logfile, stderr=logfile,
+                                      env=kdcenv, preexec_fn=os.setsid)
+            ksetup.wait()
+            if ksetup.returncode != 0:
+                raise ValueError('kinit %s failed' % self.testuser)
+
     def wait(self):
         for p in self.processes:
             os.killpg(p.pid, signal.SIGTERM)