143 lines
5.3 KiB
Python
143 lines
5.3 KiB
Python
import flask
|
|
from xmltodict import parse
|
|
from flask import current_app
|
|
from .cas_urls import create_cas_login_url
|
|
from .cas_urls import create_cas_logout_url
|
|
from .cas_urls import create_cas_validate_url
|
|
|
|
|
|
try:
|
|
from urllib import urlopen
|
|
except ImportError:
|
|
from urllib.request import urlopen
|
|
|
|
blueprint = flask.Blueprint('cas', __name__)
|
|
|
|
|
|
@blueprint.route('/login/')
|
|
def login():
|
|
"""
|
|
This route has two purposes. First, it is used by the user
|
|
to login. Second, it is used by the CAS to respond with the
|
|
`ticket` after the user logs in successfully.
|
|
|
|
When the user accesses this url, they are redirected to the CAS
|
|
to login. If the login was successful, the CAS will respond to this
|
|
route with the ticket in the url. The ticket is then validated.
|
|
If validation was successful the logged in username is saved in
|
|
the user's session under the key `CAS_USERNAME_SESSION_KEY` and
|
|
the user's attributes are saved under the key
|
|
'CAS_USERNAME_ATTRIBUTE_KEY'
|
|
"""
|
|
|
|
cas_token_session_key = current_app.config['CAS_TOKEN_SESSION_KEY']
|
|
|
|
redirect_url = create_cas_login_url(
|
|
current_app.config['CAS_SERVER'],
|
|
current_app.config['CAS_LOGIN_ROUTE'],
|
|
flask.url_for('.login', origin=flask.session.get('CAS_AFTER_LOGIN_SESSION_URL'), _external=True))
|
|
|
|
if 'ticket' in flask.request.args:
|
|
flask.session[cas_token_session_key] = flask.request.args['ticket']
|
|
|
|
if cas_token_session_key in flask.session:
|
|
|
|
if validate(flask.session[cas_token_session_key]):
|
|
if 'CAS_AFTER_LOGIN_SESSION_URL' in flask.session:
|
|
redirect_url = flask.session.pop('CAS_AFTER_LOGIN_SESSION_URL')
|
|
elif flask.request.args.get('origin'):
|
|
redirect_url = flask.request.args['origin']
|
|
else:
|
|
redirect_url = flask.url_for(
|
|
current_app.config['CAS_AFTER_LOGIN'])
|
|
else:
|
|
del flask.session[cas_token_session_key]
|
|
|
|
current_app.logger.debug('Redirecting to: {0}'.format(redirect_url))
|
|
|
|
return flask.redirect(redirect_url)
|
|
|
|
|
|
@blueprint.route('/logout/')
|
|
def logout():
|
|
"""
|
|
When the user accesses this route they are logged out.
|
|
"""
|
|
|
|
cas_username_session_key = current_app.config['CAS_USERNAME_SESSION_KEY']
|
|
cas_attributes_session_key = current_app.config['CAS_ATTRIBUTES_SESSION_KEY']
|
|
|
|
if cas_username_session_key in flask.session:
|
|
del flask.session[cas_username_session_key]
|
|
|
|
if cas_attributes_session_key in flask.session:
|
|
del flask.session[cas_attributes_session_key]
|
|
|
|
if(current_app.config['CAS_AFTER_LOGOUT'] is not None):
|
|
redirect_url = create_cas_logout_url(
|
|
current_app.config['CAS_SERVER'],
|
|
current_app.config['CAS_LOGOUT_ROUTE'],
|
|
current_app.config['CAS_AFTER_LOGOUT'])
|
|
else:
|
|
redirect_url = create_cas_logout_url(
|
|
current_app.config['CAS_SERVER'],
|
|
current_app.config['CAS_LOGOUT_ROUTE'])
|
|
|
|
current_app.logger.debug('Redirecting to: {0}'.format(redirect_url))
|
|
return flask.redirect(redirect_url)
|
|
|
|
|
|
def validate(ticket):
|
|
"""
|
|
Will attempt to validate the ticket. If validation fails, then False
|
|
is returned. If validation is successful, then True is returned
|
|
and the validated username is saved in the session under the
|
|
key `CAS_USERNAME_SESSION_KEY` while tha validated attributes dictionary
|
|
is saved under the key 'CAS_ATTRIBUTES_SESSION_KEY'.
|
|
"""
|
|
|
|
cas_username_session_key = current_app.config['CAS_USERNAME_SESSION_KEY']
|
|
cas_attributes_session_key = current_app.config['CAS_ATTRIBUTES_SESSION_KEY']
|
|
|
|
current_app.logger.debug("validating token {0}".format(ticket))
|
|
|
|
cas_validate_url = create_cas_validate_url(
|
|
current_app.config['CAS_SERVER'],
|
|
current_app.config['CAS_VALIDATE_ROUTE'],
|
|
flask.url_for('.login', origin=flask.session.get('CAS_AFTER_LOGIN_SESSION_URL'), _external=True),
|
|
ticket)
|
|
|
|
current_app.logger.debug("Making GET request to {0}".format(
|
|
cas_validate_url))
|
|
|
|
xml_from_dict = {}
|
|
isValid = False
|
|
|
|
try:
|
|
xmldump = urlopen(cas_validate_url).read().strip().decode('utf8', 'ignore')
|
|
xml_from_dict = parse(xmldump)
|
|
isValid = True if "cas:authenticationSuccess" in xml_from_dict["cas:serviceResponse"] else False
|
|
except ValueError:
|
|
current_app.logger.error("CAS returned unexpected result")
|
|
|
|
if isValid:
|
|
current_app.logger.debug("valid")
|
|
xml_from_dict = xml_from_dict["cas:serviceResponse"]["cas:authenticationSuccess"]
|
|
username = xml_from_dict["cas:user"]
|
|
attributes = xml_from_dict.get("cas:attributes", {})
|
|
|
|
if attributes and "cas:memberOf" in attributes:
|
|
try:
|
|
basestring
|
|
except NameError:
|
|
basestring = str
|
|
if isinstance(attributes["cas:memberOf"], basestring):
|
|
attributes["cas:memberOf"] = attributes["cas:memberOf"].lstrip('[').rstrip(']').split(',')
|
|
for group_number in range(0, len(attributes['cas:memberOf'])):
|
|
attributes['cas:memberOf'][group_number] = attributes['cas:memberOf'][group_number].lstrip(' ').rstrip(' ')
|
|
flask.session[cas_username_session_key] = username
|
|
flask.session[cas_attributes_session_key] = attributes
|
|
else:
|
|
current_app.logger.debug("invalid")
|
|
|
|
return isValid
|