Move parsing code into helpers module
authorSimo Sorce <simo@redhat.com>
Sun, 15 Jun 2014 21:46:47 +0000 (17:46 -0400)
committerSimo Sorce <simo@redhat.com>
Sun, 15 Jun 2014 22:04:00 +0000 (18:04 -0400)
This way common test actions can be easily reused by multiple tests.

Signed-off-by: Simo Sorce <simo@redhat.com>
tests/helpers/__init__.py [new file with mode: 0644]
tests/helpers/http.py [new file with mode: 0755]
tests/test1.py

diff --git a/tests/helpers/__init__.py b/tests/helpers/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/tests/helpers/http.py b/tests/helpers/http.py
new file mode 100755 (executable)
index 0000000..425d6b7
--- /dev/null
@@ -0,0 +1,241 @@
+#!/usr/bin/python
+#
+# Copyright (C) 2014  Simo Sorce <simo@redhat.com>
+#
+# see file 'COPYING' for use and warranty information
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+
+
+from lxml import html
+import requests
+import string
+import urlparse
+
+
+class WrongPage(Exception):
+    pass
+
+
+class PageTree(object):
+
+    def __init__(self, result):
+        self.result = result
+        self.text = result.text
+        self._tree = None
+
+    @property
+    def tree(self):
+        if not self._tree:
+            self._tree = html.fromstring(self.text)
+        return self._tree
+
+    def first_value(self, rule):
+        result = self.tree.xpath(rule)
+        if type(result) is list:
+            if len(result) > 0:
+                result = result[0]
+            else:
+                result = None
+        return result
+
+    def all_values(self, rule):
+        result = self.tree.xpath(rule)
+        if type(result) is list:
+            return result
+        return [result]
+
+    def make_referer(self):
+        return self.result.url
+
+    def expected_value(self, rule, expected):
+        value = self.first_value(rule)
+        if value != expected:
+            raise ValueError("Expected [%s], got [%s]" % (expected, value))
+
+
+class HttpSessions(object):
+
+    def __init__(self):
+        self.servers = dict()
+
+    def add_server(self, name, baseuri, user=None, pwd=None):
+        new = {'baseuri': baseuri,
+               'session': requests.Session()}
+        if user:
+            new['user'] = user
+        if pwd:
+            new['pwd'] = pwd
+        self.servers[name] = new
+
+    def get_session(self, url):
+        for srv in self.servers:
+            d = self.servers[srv]
+            if url.startswith(d['baseuri']):
+                return d['session']
+
+        raise ValueError("Unknown URL: %s" % url)
+
+    def get(self, url, **kwargs):
+        session = self.get_session(url)
+        return session.get(url, allow_redirects=False, **kwargs)
+
+    def post(self, url, **kwargs):
+        session = self.get_session(url)
+        return session.post(url, allow_redirects=False, **kwargs)
+
+    def access(self, action, url, **kwargs):
+        action = string.lower(action)
+        if action == 'get':
+            return self.get(url, **kwargs)
+        elif action == 'post':
+            return self.post(url, **kwargs)
+        else:
+            raise ValueError("Unknown action type: [%s]" % action)
+
+    def new_url(self, referer, action):
+        if action.startswith('/'):
+            u = urlparse.urlparse(referer)
+            return '%s://%s%s' % (u.scheme, u.netloc, action)
+        return action
+
+    def get_form_data(self, page, form_id, input_fields):
+        values = []
+        action = page.first_value('//form[@id="%s"]/@action' % form_id)
+        values.append(action)
+        method = page.first_value('//form[@id="%s"]/@method' % form_id)
+        values.append(method)
+        for field in input_fields:
+            value = page.all_values('//form[@id="%s"]/input/@%s' % (form_id,
+                                                                    field))
+            values.append(value)
+        return values
+
+    def handle_login_form(self, idp, page):
+        if type(page) != PageTree:
+            raise TypeError("Expected PageTree object")
+
+        srv = self.servers[idp]
+
+        try:
+            results = self.get_form_data(page, "login_form", [])
+            action_url = results[0]
+            method = results[1]
+            if action_url is None:
+                raise Exception
+        except Exception:  # pylint: disable=broad-except
+            raise WrongPage("Not a Login Form Page")
+
+        referer = page.make_referer()
+        headers = {'referer': referer}
+        payload = {'login_name': srv['user'],
+                   'login_password': srv['pwd']}
+
+        return [method, self.new_url(referer, action_url),
+                {'headers': headers, 'data': payload}]
+
+    def handle_return_form(self, page):
+        if type(page) != PageTree:
+            raise TypeError("Expected PageTree object")
+
+        try:
+            results = self.get_form_data(page, "saml-response",
+                                         ["name", "value"])
+            action_url = results[0]
+            if action_url is None:
+                raise Exception
+            method = results[1]
+            names = results[2]
+            values = results[3]
+        except Exception:  # pylint: disable=broad-except
+            raise WrongPage("Not a Return Form Page")
+
+        referer = page.make_referer()
+        headers = {'referer': referer}
+
+        payload = {}
+        for i in range(0, len(names)):
+            payload[names[i]] = values[i]
+
+        return [method, self.new_url(referer, action_url),
+                {'headers': headers, 'data': payload}]
+
+    def fetch_page(self, idp, target_url):
+        url = target_url
+        action = 'get'
+        args = {}
+
+        while True:
+            r = self.access(action, url, **args)  # pylint: disable=star-args
+            if r.status_code == 303:
+                url = r.headers['location']
+                action = 'get'
+                args = {}
+            elif r.status_code == 200:
+                page = PageTree(r)
+
+                try:
+                    (action, url, args) = self.handle_login_form(idp, page)
+                    continue
+                except WrongPage:
+                    pass
+
+                try:
+                    (action, url, args) = self.handle_return_form(page)
+                    continue
+                except WrongPage:
+                    pass
+
+                # Either we got what we wanted, or we have to stop anyway
+                return page
+            else:
+                raise ValueError("Unhandled status (%d) on url %s" % (
+                                 r.status_code, url))
+
+    def auth_to_idp(self, idp):
+
+        srv = self.servers[idp]
+        target_url = '%s/%s/' % (srv['baseuri'], idp)
+
+        r = self.access('get', target_url)
+        if r.status_code != 200:
+            raise ValueError("Access to idp failed: %s" % repr(r))
+
+        page = PageTree(r)
+        page.expected_value('//div[@id="content"]/p/a/text()', 'Log In')
+        href = page.first_value('//div[@id="content"]/p/a/@href')
+        url = self.new_url(target_url, href)
+
+        page = self.fetch_page(idp, url)
+        page.expected_value('//div[@id="welcome"]/p/text()',
+                            'Welcome %s!' % srv['user'])
+
+    def add_sp_metadata(self, idp, sp):
+        idpsrv = self.servers[idp]
+        idpuri = idpsrv['baseuri']
+        spuri = self.servers[sp]['baseuri']
+
+        url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp)
+        headers = {'referer': url}
+        payload = {'name': sp}
+        m = requests.get('%s/saml2/metadata' % spuri)
+        metafile = {'metafile': m.content}
+        r = idpsrv['session'].post(url, headers=headers,
+                                   data=payload, files=metafile)
+        if r.status_code != 200:
+            raise ValueError('Failed to post SP data [%s]' % repr(r))
+
+        page = PageTree(r)
+        page.expected_value('//div[@class="alert alert-success"]/p/text()',
+                            'SP Successfully added')
index 6e76f88..411ac6e 100755 (executable)
 # along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
 
-from lxml import html
+from helpers import http  # pylint: disable=relative-import
 import os
 import pwd
-import requests
 import sys
-import urlparse
 
 
-def get_session(srvs, url):
-    for srv in srvs:
-        if url.startswith(srv['baseuri']):
-            return srv['session']
-
-    raise ValueError("Unknown URL: %s" % url)
-
-
-def get_url(srvs, url, **kwargs):
-    session = get_session(srvs, url)
-    return session.get(url, allow_redirects=False, **kwargs)
-
-
-def post_url(srvs, url, **kwargs):
-    session = get_session(srvs, url)
-    return session.post(url, allow_redirects=False, **kwargs)
-
-
-def access_url(action, srvs, url, **kwargs):
-    if action == 'get':
-        return get_url(srvs, url, **kwargs)
-    elif action == 'post':
-        return post_url(srvs, url, **kwargs)
-    else:
-        raise ValueError("Unknown action type: [%s]" % action)
-
-
-def get_new_url(referer, action):
-    if action.startswith('/'):
-        u = urlparse.urlparse(referer)
-        return '%s://%s%s' % (u.scheme, u.netloc, action)
-    return action
-
-
-def parse_first(tree, rule):
-    result = tree.xpath(rule)
-    if type(result) is list:
-        if len(result) > 0:
-            result = result[0]
-        else:
-            result = None
-    return result
-
-
-def parse_list(tree, rule):
-    result = tree.xpath(rule)
-    if type(result) is list:
-        return result
-    return [result]
-
-
-def handle_login_form(idp, r):
-    tree = html.fromstring(r.text)
-    try:
-        action_url = parse_first(tree, '//form[@id="login_form"]/@action')
-        method = parse_first(tree, '//form[@id="login_form"]/@method')
-    except Exception:  # pylint: disable=broad-except
-        return []
-
-    if action_url is None:
-        return []
-
-    headers = {'referer': r.url}
-    payload = {'login_name': idp['user'],
-               'login_password': idp['pass']}
-
-    return [method,
-            get_new_url(r.url, action_url),
-            {'headers': headers, 'data': payload}]
-
-
-def handle_return_form(r):
-    tree = html.fromstring(r.text)
-    try:
-        action_url = parse_first(tree, '//form[@id="saml-response"]/@action')
-        method = parse_first(tree, '//form[@id="saml-response"]/@method')
-        names = parse_list(tree, '//form[@id="saml-response"]/input/@name')
-        values = parse_list(tree, '//form[@id="saml-response"]/input/@value')
-    except Exception:  # pylint: disable=broad-except
-        return []
-
-    if action_url is None:
-        return []
-
-    headers = {'referer': r.url}
-    payload = {}
-    for i in range(0, len(names)):
-        payload[names[i]] = values[i]
-
-    return [method,
-            get_new_url(r.url, action_url),
-            {'headers': headers, 'data': payload}]
-
-
-def go_to_url(srvs, idp, start_url, target_url):
-
-    url = start_url
-    action = 'get'
-    args = {}
-
-    good = True
-    while good:
-        r = access_url(action, srvs, url, **args)  # pylint: disable=star-args
-        if r.status_code == 303:
-            url = r.headers['location']
-            action = 'get'
-            args = {}
-        elif r.status_code == 200:
-            if url == target_url:
-                return r.text
-
-            result = handle_login_form(idp, r)
-            if result:
-                action = result[0]
-                url = result[1]
-                args = result[2]
-                continue
-
-            result = handle_return_form(r)
-            if result:
-                action = result[0]
-                url = result[1]
-                args = result[2]
-                continue
-
-            raise ValueError("Unhandled Success code at url %s" % url)
-
-        else:
-            good = False
-
-    raise ValueError("Unhandled status (%d) on url %s" % (r.status_code, url))
-
-
-def auth_to_idp(idp):
-
-    target_url = '%s/%s/' % (idp['baseuri'], idp['name'])
-    srvs = [idp]
+if __name__ == '__main__':
+    basedir = sys.argv[1]
 
-    r = access_url('get', srvs, target_url)
-    if r.status_code != 200:
-        print >> sys.stderr, " ERROR: Access to idp failed: %s" % repr(r)
-        return False
+    idpname = 'idp1'
+    spname = 'sp1'
+    user = pwd.getpwuid(os.getuid())[0]
 
-    tree = html.fromstring(r.text)
-    try:
-        expected = 'Log In'
-        login = parse_first(tree, '//div[@id="content"]/p/a/text()')
-        if login != expected:
-            print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
-                                                                     login)
-        href = parse_first(tree, '//div[@id="content"]/p/a/@href')
-        start_url = get_new_url(target_url, href)
-    except Exception, e:  # pylint: disable=broad-except
-        print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
-        return False
+    sess = http.HttpSessions()
+    sess.add_server(idpname, 'http://127.0.0.10:45080', user, 'ipsilon')
+    sess.add_server(spname, 'http://127.0.0.11:45081')
 
+    print "test1: Authenticate to IDP ...",
     try:
-        page = go_to_url(srvs, idp, start_url, target_url)
+        sess.auth_to_idp(idpname)
     except Exception, e:  # pylint: disable=broad-except
         print >> sys.stderr, " ERROR: %s" % repr(e)
-        return False
-
-    tree = html.fromstring(page)
-    try:
-        welcome = parse_first(tree, '//div[@id="welcome"]/p/text()')
-    except Exception, e:  # pylint: disable=broad-except
-        print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
-        return False
-
-    expected = 'Welcome %s!' % idp['user']
-    if welcome != expected:
-        print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
-                                                                 welcome)
-        return False
-
-    return True
-
-
-def add_sp_metadata(idp, sp):
-    url = '%s/%s/admin/providers/saml2/admin/new' % (idp['baseuri'],
-                                                     idp['name'])
-    headers = {'referer': url}
-    payload = {'name': sp['name']}
-    m = requests.get('%s/saml2/metadata' % sp['baseuri'])
-    metafile = {'metafile': m.content}
-    r = idp['session'].post(url, headers=headers,
-                            data=payload, files=metafile)
-    if r.status_code != 200:
-        print >> sys.stderr, " ERROR: %s" % repr(r)
-        return False
-
-    tree = html.fromstring(r.text)
-    try:
-        alert = parse_first(tree,
-                            '//div[@class="alert alert-success"]/p/text()')
-    except Exception, e:  # pylint: disable=broad-except
-        print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e)
-        return False
-
-    expected = 'SP Successfully added'
-    if alert != expected:
-        print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected,
-                                                                 alert)
-        return False
-
-    return True
-
-
-if __name__ == '__main__':
-    basedir = sys.argv[1]
-
-    idpsrv = {'name': 'idp1',
-              'baseuri': 'http://127.0.0.10:45080',
-              'session': requests.Session(),
-              'user': pwd.getpwuid(os.getuid())[0],
-              'pass': 'ipsilon'}
-    spsrv = {'name': 'sp1',
-             'baseuri': 'http://127.0.0.11:45081',
-             'session': requests.Session()}
-
-    print "test1: Authenticate to IDP ...",
-    if not auth_to_idp(idpsrv):
         sys.exit(1)
     print " SUCCESS"
 
     print "test1: Add SP Metadata to IDP ...",
-    if not add_sp_metadata(idpsrv, spsrv):
+    try:
+        sess.add_sp_metadata(idpname, spname)
+    except Exception, e:  # pylint: disable=broad-except
+        print >> sys.stderr, " ERROR: %s" % repr(e)
         sys.exit(1)
     print " SUCCESS"
 
     print "test1: Access SP Protected Area ...",
-    servers = [idpsrv, spsrv]
-    spurl = '%s/sp/' % (spsrv['baseuri'])
     try:
-        text = go_to_url(servers, idpsrv, spurl, spurl)
+        page = sess.fetch_page(idpname, 'http://127.0.0.11:45081/sp/')
+        page.expected_value('text()', 'WORKS!')
     except ValueError, e:
         print >> sys.stderr, " ERROR: %s" % repr(e)
         sys.exit(1)
-    if text != "WORKS!":
-        print >> sys.stderr, "ERROR: Expected [WORKS!], got [%s]" % text
-        sys.exit(1)
     print " SUCCESS"