From 9a651e44ce34e092708b9a0df40de3f112132199 Mon Sep 17 00:00:00 2001 From: Simo Sorce Date: Sun, 15 Jun 2014 17:46:47 -0400 Subject: [PATCH] Move parsing code into helpers module This way common test actions can be easily reused by multiple tests. Signed-off-by: Simo Sorce --- tests/helpers/__init__.py | 0 tests/helpers/http.py | 241 +++++++++++++++++++++++++++++++++++++ tests/test1.py | 243 +++----------------------------------- 3 files changed, 258 insertions(+), 226 deletions(-) create mode 100644 tests/helpers/__init__.py create mode 100755 tests/helpers/http.py diff --git a/tests/helpers/__init__.py b/tests/helpers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/helpers/http.py b/tests/helpers/http.py new file mode 100755 index 0000000..425d6b7 --- /dev/null +++ b/tests/helpers/http.py @@ -0,0 +1,241 @@ +#!/usr/bin/python +# +# Copyright (C) 2014 Simo Sorce +# +# see file 'COPYING' for use and warranty information +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +from lxml import html +import requests +import string +import urlparse + + +class WrongPage(Exception): + pass + + +class PageTree(object): + + def __init__(self, result): + self.result = result + self.text = result.text + self._tree = None + + @property + def tree(self): + if not self._tree: + self._tree = html.fromstring(self.text) + return self._tree + + def first_value(self, rule): + result = self.tree.xpath(rule) + if type(result) is list: + if len(result) > 0: + result = result[0] + else: + result = None + return result + + def all_values(self, rule): + result = self.tree.xpath(rule) + if type(result) is list: + return result + return [result] + + def make_referer(self): + return self.result.url + + def expected_value(self, rule, expected): + value = self.first_value(rule) + if value != expected: + raise ValueError("Expected [%s], got [%s]" % (expected, value)) + + +class HttpSessions(object): + + def __init__(self): + self.servers = dict() + + def add_server(self, name, baseuri, user=None, pwd=None): + new = {'baseuri': baseuri, + 'session': requests.Session()} + if user: + new['user'] = user + if pwd: + new['pwd'] = pwd + self.servers[name] = new + + def get_session(self, url): + for srv in self.servers: + d = self.servers[srv] + if url.startswith(d['baseuri']): + return d['session'] + + raise ValueError("Unknown URL: %s" % url) + + def get(self, url, **kwargs): + session = self.get_session(url) + return session.get(url, allow_redirects=False, **kwargs) + + def post(self, url, **kwargs): + session = self.get_session(url) + return session.post(url, allow_redirects=False, **kwargs) + + def access(self, action, url, **kwargs): + action = string.lower(action) + if action == 'get': + return self.get(url, **kwargs) + elif action == 'post': + return self.post(url, **kwargs) + else: + raise ValueError("Unknown action type: [%s]" % action) + + def new_url(self, referer, action): + if action.startswith('/'): + u = urlparse.urlparse(referer) + return '%s://%s%s' % (u.scheme, u.netloc, action) + return action + + def get_form_data(self, page, form_id, input_fields): + values = [] + action = page.first_value('//form[@id="%s"]/@action' % form_id) + values.append(action) + method = page.first_value('//form[@id="%s"]/@method' % form_id) + values.append(method) + for field in input_fields: + value = page.all_values('//form[@id="%s"]/input/@%s' % (form_id, + field)) + values.append(value) + return values + + def handle_login_form(self, idp, page): + if type(page) != PageTree: + raise TypeError("Expected PageTree object") + + srv = self.servers[idp] + + try: + results = self.get_form_data(page, "login_form", []) + action_url = results[0] + method = results[1] + if action_url is None: + raise Exception + except Exception: # pylint: disable=broad-except + raise WrongPage("Not a Login Form Page") + + referer = page.make_referer() + headers = {'referer': referer} + payload = {'login_name': srv['user'], + 'login_password': srv['pwd']} + + return [method, self.new_url(referer, action_url), + {'headers': headers, 'data': payload}] + + def handle_return_form(self, page): + if type(page) != PageTree: + raise TypeError("Expected PageTree object") + + try: + results = self.get_form_data(page, "saml-response", + ["name", "value"]) + action_url = results[0] + if action_url is None: + raise Exception + method = results[1] + names = results[2] + values = results[3] + except Exception: # pylint: disable=broad-except + raise WrongPage("Not a Return Form Page") + + referer = page.make_referer() + headers = {'referer': referer} + + payload = {} + for i in range(0, len(names)): + payload[names[i]] = values[i] + + return [method, self.new_url(referer, action_url), + {'headers': headers, 'data': payload}] + + def fetch_page(self, idp, target_url): + url = target_url + action = 'get' + args = {} + + while True: + r = self.access(action, url, **args) # pylint: disable=star-args + if r.status_code == 303: + url = r.headers['location'] + action = 'get' + args = {} + elif r.status_code == 200: + page = PageTree(r) + + try: + (action, url, args) = self.handle_login_form(idp, page) + continue + except WrongPage: + pass + + try: + (action, url, args) = self.handle_return_form(page) + continue + except WrongPage: + pass + + # Either we got what we wanted, or we have to stop anyway + return page + else: + raise ValueError("Unhandled status (%d) on url %s" % ( + r.status_code, url)) + + def auth_to_idp(self, idp): + + srv = self.servers[idp] + target_url = '%s/%s/' % (srv['baseuri'], idp) + + r = self.access('get', target_url) + if r.status_code != 200: + raise ValueError("Access to idp failed: %s" % repr(r)) + + page = PageTree(r) + page.expected_value('//div[@id="content"]/p/a/text()', 'Log In') + href = page.first_value('//div[@id="content"]/p/a/@href') + url = self.new_url(target_url, href) + + page = self.fetch_page(idp, url) + page.expected_value('//div[@id="welcome"]/p/text()', + 'Welcome %s!' % srv['user']) + + def add_sp_metadata(self, idp, sp): + idpsrv = self.servers[idp] + idpuri = idpsrv['baseuri'] + spuri = self.servers[sp]['baseuri'] + + url = '%s/%s/admin/providers/saml2/admin/new' % (idpuri, idp) + headers = {'referer': url} + payload = {'name': sp} + m = requests.get('%s/saml2/metadata' % spuri) + metafile = {'metafile': m.content} + r = idpsrv['session'].post(url, headers=headers, + data=payload, files=metafile) + if r.status_code != 200: + raise ValueError('Failed to post SP data [%s]' % repr(r)) + + page = PageTree(r) + page.expected_value('//div[@class="alert alert-success"]/p/text()', + 'SP Successfully added') diff --git a/tests/test1.py b/tests/test1.py index 6e76f88..411ac6e 100755 --- a/tests/test1.py +++ b/tests/test1.py @@ -18,253 +18,44 @@ # along with this program. If not, see . -from lxml import html +from helpers import http # pylint: disable=relative-import import os import pwd -import requests import sys -import urlparse -def get_session(srvs, url): - for srv in srvs: - if url.startswith(srv['baseuri']): - return srv['session'] - - raise ValueError("Unknown URL: %s" % url) - - -def get_url(srvs, url, **kwargs): - session = get_session(srvs, url) - return session.get(url, allow_redirects=False, **kwargs) - - -def post_url(srvs, url, **kwargs): - session = get_session(srvs, url) - return session.post(url, allow_redirects=False, **kwargs) - - -def access_url(action, srvs, url, **kwargs): - if action == 'get': - return get_url(srvs, url, **kwargs) - elif action == 'post': - return post_url(srvs, url, **kwargs) - else: - raise ValueError("Unknown action type: [%s]" % action) - - -def get_new_url(referer, action): - if action.startswith('/'): - u = urlparse.urlparse(referer) - return '%s://%s%s' % (u.scheme, u.netloc, action) - return action - - -def parse_first(tree, rule): - result = tree.xpath(rule) - if type(result) is list: - if len(result) > 0: - result = result[0] - else: - result = None - return result - - -def parse_list(tree, rule): - result = tree.xpath(rule) - if type(result) is list: - return result - return [result] - - -def handle_login_form(idp, r): - tree = html.fromstring(r.text) - try: - action_url = parse_first(tree, '//form[@id="login_form"]/@action') - method = parse_first(tree, '//form[@id="login_form"]/@method') - except Exception: # pylint: disable=broad-except - return [] - - if action_url is None: - return [] - - headers = {'referer': r.url} - payload = {'login_name': idp['user'], - 'login_password': idp['pass']} - - return [method, - get_new_url(r.url, action_url), - {'headers': headers, 'data': payload}] - - -def handle_return_form(r): - tree = html.fromstring(r.text) - try: - action_url = parse_first(tree, '//form[@id="saml-response"]/@action') - method = parse_first(tree, '//form[@id="saml-response"]/@method') - names = parse_list(tree, '//form[@id="saml-response"]/input/@name') - values = parse_list(tree, '//form[@id="saml-response"]/input/@value') - except Exception: # pylint: disable=broad-except - return [] - - if action_url is None: - return [] - - headers = {'referer': r.url} - payload = {} - for i in range(0, len(names)): - payload[names[i]] = values[i] - - return [method, - get_new_url(r.url, action_url), - {'headers': headers, 'data': payload}] - - -def go_to_url(srvs, idp, start_url, target_url): - - url = start_url - action = 'get' - args = {} - - good = True - while good: - r = access_url(action, srvs, url, **args) # pylint: disable=star-args - if r.status_code == 303: - url = r.headers['location'] - action = 'get' - args = {} - elif r.status_code == 200: - if url == target_url: - return r.text - - result = handle_login_form(idp, r) - if result: - action = result[0] - url = result[1] - args = result[2] - continue - - result = handle_return_form(r) - if result: - action = result[0] - url = result[1] - args = result[2] - continue - - raise ValueError("Unhandled Success code at url %s" % url) - - else: - good = False - - raise ValueError("Unhandled status (%d) on url %s" % (r.status_code, url)) - - -def auth_to_idp(idp): - - target_url = '%s/%s/' % (idp['baseuri'], idp['name']) - srvs = [idp] +if __name__ == '__main__': + basedir = sys.argv[1] - r = access_url('get', srvs, target_url) - if r.status_code != 200: - print >> sys.stderr, " ERROR: Access to idp failed: %s" % repr(r) - return False + idpname = 'idp1' + spname = 'sp1' + user = pwd.getpwuid(os.getuid())[0] - tree = html.fromstring(r.text) - try: - expected = 'Log In' - login = parse_first(tree, '//div[@id="content"]/p/a/text()') - if login != expected: - print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected, - login) - href = parse_first(tree, '//div[@id="content"]/p/a/@href') - start_url = get_new_url(target_url, href) - except Exception, e: # pylint: disable=broad-except - print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e) - return False + sess = http.HttpSessions() + sess.add_server(idpname, 'http://127.0.0.10:45080', user, 'ipsilon') + sess.add_server(spname, 'http://127.0.0.11:45081') + print "test1: Authenticate to IDP ...", try: - page = go_to_url(srvs, idp, start_url, target_url) + sess.auth_to_idp(idpname) except Exception, e: # pylint: disable=broad-except print >> sys.stderr, " ERROR: %s" % repr(e) - return False - - tree = html.fromstring(page) - try: - welcome = parse_first(tree, '//div[@id="welcome"]/p/text()') - except Exception, e: # pylint: disable=broad-except - print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e) - return False - - expected = 'Welcome %s!' % idp['user'] - if welcome != expected: - print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected, - welcome) - return False - - return True - - -def add_sp_metadata(idp, sp): - url = '%s/%s/admin/providers/saml2/admin/new' % (idp['baseuri'], - idp['name']) - headers = {'referer': url} - payload = {'name': sp['name']} - m = requests.get('%s/saml2/metadata' % sp['baseuri']) - metafile = {'metafile': m.content} - r = idp['session'].post(url, headers=headers, - data=payload, files=metafile) - if r.status_code != 200: - print >> sys.stderr, " ERROR: %s" % repr(r) - return False - - tree = html.fromstring(r.text) - try: - alert = parse_first(tree, - '//div[@class="alert alert-success"]/p/text()') - except Exception, e: # pylint: disable=broad-except - print >> sys.stderr, " ERROR: Unexpected reply [%s]" % repr(e) - return False - - expected = 'SP Successfully added' - if alert != expected: - print >> sys.stderr, " ERROR: Expected [%s] got [%s]" % (expected, - alert) - return False - - return True - - -if __name__ == '__main__': - basedir = sys.argv[1] - - idpsrv = {'name': 'idp1', - 'baseuri': 'http://127.0.0.10:45080', - 'session': requests.Session(), - 'user': pwd.getpwuid(os.getuid())[0], - 'pass': 'ipsilon'} - spsrv = {'name': 'sp1', - 'baseuri': 'http://127.0.0.11:45081', - 'session': requests.Session()} - - print "test1: Authenticate to IDP ...", - if not auth_to_idp(idpsrv): sys.exit(1) print " SUCCESS" print "test1: Add SP Metadata to IDP ...", - if not add_sp_metadata(idpsrv, spsrv): + try: + sess.add_sp_metadata(idpname, spname) + except Exception, e: # pylint: disable=broad-except + print >> sys.stderr, " ERROR: %s" % repr(e) sys.exit(1) print " SUCCESS" print "test1: Access SP Protected Area ...", - servers = [idpsrv, spsrv] - spurl = '%s/sp/' % (spsrv['baseuri']) try: - text = go_to_url(servers, idpsrv, spurl, spurl) + page = sess.fetch_page(idpname, 'http://127.0.0.11:45081/sp/') + page.expected_value('text()', 'WORKS!') except ValueError, e: print >> sys.stderr, " ERROR: %s" % repr(e) sys.exit(1) - if text != "WORKS!": - print >> sys.stderr, "ERROR: Expected [WORKS!], got [%s]" % text - sys.exit(1) print " SUCCESS" -- 2.20.1