+ helper to create CASAccounts for existing users
This commit is contained in:
parent
6e51e555f6
commit
cf4d80da13
4 changed files with 97 additions and 28 deletions
|
@ -2,7 +2,7 @@ from django.contrib.auth import get_user_model
|
|||
from django.db import transaction
|
||||
|
||||
from authens.models import CASAccount, OldCASAccount
|
||||
from authens.utils import get_cas_client
|
||||
from authens.utils import get_cas_client, parse_entrance_year
|
||||
|
||||
UserModel = get_user_model()
|
||||
|
||||
|
@ -11,30 +11,6 @@ class ENSCASError(Exception):
|
|||
pass
|
||||
|
||||
|
||||
def get_entrance_year(attributes):
|
||||
"""Infer the entrance year of a CAS account holder from her home directory."""
|
||||
|
||||
# The home directory of a user is of the form /users/YEAR/DEPARTMENT/CAS_LOGIN where
|
||||
# YEAR is a 2-digit number representing the entrance year of the student. We get the
|
||||
# entrance year from there.
|
||||
|
||||
home_dir = attributes.get("homeDirectory")
|
||||
if home_dir is None:
|
||||
raise ENSCASError("Entrance year not available")
|
||||
|
||||
dirs = home_dir.split("/")
|
||||
if len(dirs) < 3 or not dirs[2].isdecimal():
|
||||
raise ENSCASError("Invalid homeDirectory: {}".format(home_dir))
|
||||
|
||||
# Expand the 2-digit entrance year into 4 digits.
|
||||
# This will break in 2080.
|
||||
year = int(dirs[2])
|
||||
if year >= 80:
|
||||
return 1900 + year
|
||||
else:
|
||||
return 2000 + year
|
||||
|
||||
|
||||
class ENSCASBackend:
|
||||
"""AuthENS CAS authentication backend.
|
||||
|
||||
|
@ -106,8 +82,10 @@ class ENSCASBackend:
|
|||
- If a matching CAS account exists, retrieve it.
|
||||
"""
|
||||
|
||||
entrance_year = get_entrance_year(attributes)
|
||||
email = attributes.get("email", None)
|
||||
email = attributes.get("email")
|
||||
entrance_year = parse_entrance_year(attributes.get("homeDirectory"))
|
||||
if entrance_year is None:
|
||||
raise ENSCASError("Entrance year not available")
|
||||
|
||||
with transaction.atomic():
|
||||
try:
|
||||
|
|
4
authens/conf.py
Normal file
4
authens/conf.py
Normal file
|
@ -0,0 +1,4 @@
|
|||
default_settings = {
|
||||
"LDAP_SERVER_URL": "ldaps://ldap.spi.ens.fr:636",
|
||||
# TODO: CAS_SERVER_URL
|
||||
}
|
62
authens/shortcuts.py
Normal file
62
authens/shortcuts.py
Normal file
|
@ -0,0 +1,62 @@
|
|||
"""Helper functions to get CAS metadata and create CAS accounts."""
|
||||
|
||||
# TODO: make the python-ldap dependency optional
|
||||
import ldap
|
||||
|
||||
from django.conf import settings
|
||||
from django.contrib.auth import get_user_model
|
||||
|
||||
from authens.conf import default_settings
|
||||
from authens.models import CASAccount, OldCASAccount
|
||||
from authens.utils import parse_entrance_year
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
|
||||
def _extract_ldap_info(entry, field):
|
||||
dn, attrs = entry
|
||||
return attrs[field][0].decode("utf-8")
|
||||
|
||||
|
||||
def fetch_cas_account(cas_login):
|
||||
"""Issue an LDAP connection to retrieve metadata associated to a CAS account."""
|
||||
|
||||
# Don't trust the user! Only accept alphanumeric account names.
|
||||
if not cas_login.isalnum():
|
||||
raise ValueError("Illegal CAS login: {}".format(cas_login))
|
||||
|
||||
ldap_url = getattr(settings, "LDAP_SERVER_URL", default_settings["LDAP_SERVER_URL"])
|
||||
ldap_obj = ldap.initialize(ldap_url)
|
||||
res = ldap_obj.search_s(
|
||||
"dc=spi,dc=ens,dc=fr",
|
||||
ldap.SCOPE_SUBTREE,
|
||||
"(uid={})".format(cas_login),
|
||||
["uid", "cn", "homeDirectory"],
|
||||
)
|
||||
if not res:
|
||||
return None
|
||||
|
||||
(res,) = res
|
||||
assert _extract_ldap_info(res, "uid") == cas_login
|
||||
return {
|
||||
"cn": _extract_ldap_info(res, "cn"),
|
||||
"entrance_year": parse_entrance_year(_extract_ldap_info(res, "homeDirectory")),
|
||||
}
|
||||
|
||||
|
||||
def register_cas_account(user: User, cas_login: str) -> CASAccount:
|
||||
"""Register a user as a CAS user and return the newly created CASAccount."""
|
||||
|
||||
if not cas_login:
|
||||
raise ValueError("cas_login must be non-empty")
|
||||
if CASAccount.objects.filter(cas_login=cas_login).exists():
|
||||
raise ValueError("A CAS account named '{}' exists already".format(cas_login))
|
||||
if CASAccount.objects.filter(user=user).exists():
|
||||
raise ValueError("User '{}' already has a CAS account".format(user))
|
||||
if OldCASAccount.objects.filter(user=user).exists():
|
||||
raise ValueError("User '{}' has an old CAS account".format(user))
|
||||
|
||||
entrance_year = fetch_cas_account(cas_login)["entrance_year"]
|
||||
return CASAccount.objects.create(
|
||||
user=user, cas_login=cas_login, entrance_year=entrance_year
|
||||
)
|
|
@ -1,3 +1,5 @@
|
|||
"""Internal utility functions used by authens."""
|
||||
|
||||
from cas import CASClient
|
||||
from urllib.parse import urlunparse
|
||||
|
||||
|
@ -11,3 +13,26 @@ def get_cas_client(request):
|
|||
),
|
||||
server_url="https://cas.eleves.ens.fr/",
|
||||
)
|
||||
|
||||
|
||||
def parse_entrance_year(home_dir):
|
||||
"""Infer the entrance year of a CAS account from their home directory."""
|
||||
|
||||
# The home directory of a user is of the form /users/YEAR/DEPARTMENT/CAS_LOGIN where
|
||||
# YEAR is a 2-digit number representing the entrance year of the student. We get the
|
||||
# entrance year from there.
|
||||
|
||||
if home_dir is None:
|
||||
return None
|
||||
|
||||
dirs = home_dir.split("/")
|
||||
if len(dirs) < 3 or not dirs[2].isdecimal() or dirs[1] != "users":
|
||||
raise ValueError("Invalid home directory: {}".format(home_dir))
|
||||
|
||||
# Expand the 2-digit entrance year into 4 digits.
|
||||
# This will break in 2080.
|
||||
year = int(dirs[2])
|
||||
if year >= 80:
|
||||
return 1900 + year
|
||||
else:
|
||||
return 2000 + year
|
||||
|
|
Loading…
Reference in a new issue