95 lines
3 KiB
Python
95 lines
3 KiB
Python
from django.contrib.auth import get_user_model
|
|
from django.db import transaction
|
|
|
|
from authens.models import Clipper
|
|
from authens.utils import get_cas_client
|
|
|
|
UserModel = get_user_model()
|
|
|
|
|
|
class ENSCASError(Exception):
|
|
pass
|
|
|
|
|
|
def get_entrance_year(attributes):
|
|
"""Infer the entrance year of a clipper account holder from its home directory."""
|
|
|
|
home_dir = attributes.get("homeDirectory")
|
|
if home_dir is None:
|
|
raise ENSCASError("Entrance year not available")
|
|
|
|
dirs = home_dir.split("/")
|
|
if len(dirs) < 3 or not dirs[2].isdecimal():
|
|
raise ENSCASError("Invalid homeDirectory: {}".format(home_dir))
|
|
|
|
year = int(dirs[2])
|
|
# This will break in 2080.
|
|
if year >= 80:
|
|
return 1900 + year
|
|
else:
|
|
return 2000 + year
|
|
|
|
|
|
def find_available_username(clipper_uid):
|
|
"""Find an available username 'close' to a clipper uid."""
|
|
|
|
taken = UserModel.objects.filter(username__startswith=clipper_uid).values_list(
|
|
"username", flat=True
|
|
)
|
|
if clipper_uid not in taken:
|
|
return clipper_uid
|
|
else:
|
|
i = 2
|
|
while clipper_uid + str(i) in taken:
|
|
i += 1
|
|
return clipper_uid + str(i)
|
|
|
|
|
|
class ENSCASBackend:
|
|
"""ENSAuth authentication backend.
|
|
|
|
Implement standard CAS v3 authentication and handles username clashes with non-CAS
|
|
accounts and potential old CAS accounts.
|
|
|
|
Every user connecting via CAS is given a `authens.models.Clipper` instance which
|
|
remembers her clipper login and her entrance year (the year her clipper account was
|
|
created).
|
|
At each connection, we search for a Clipper account with the given clipper login
|
|
(uid) and create one if none exists. In case the Clipper account's entrance year
|
|
does not match the entrance year given by CAS, it means it is a old account and it
|
|
must be deleted. The corresponding user can still connect using regular Django
|
|
authentication.
|
|
"""
|
|
|
|
def authenticate(self, request, ticket=None):
|
|
cas_client = get_cas_client(request)
|
|
uid, attributes, _ = cas_client.verify_ticket(ticket)
|
|
|
|
if not uid:
|
|
# Authentication failed
|
|
return None
|
|
|
|
year = get_entrance_year(attributes)
|
|
return self._get_or_create(uid, year)
|
|
|
|
def _get_or_create(self, uid, entrance_year):
|
|
with transaction.atomic():
|
|
try:
|
|
user = UserModel.objects.get(clipper__uid=uid)
|
|
if user.clipper.entrance_year != entrance_year:
|
|
user.clipper.delete()
|
|
user = None
|
|
except UserModel.DoesNotExist:
|
|
user = None
|
|
|
|
if user is None:
|
|
username = find_available_username(uid)
|
|
user = UserModel.objects.create_user(username=username)
|
|
Clipper.objects.create(user=user, entrance_year=entrance_year, uid=uid)
|
|
return user
|
|
|
|
def get_user(self, user_id):
|
|
try:
|
|
return UserModel.objects.get(pk=user_id)
|
|
except UserModel.DoesNotExist:
|
|
return None
|