django-allauth-ens/allauth_ens/adapter.py

257 lines
8.6 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
2018-04-22 20:13:42 +02:00
from django.conf import settings
from django.contrib.auth import get_user_model
from allauth.account.models import EmailAddress
from allauth.account.utils import user_email, user_field, user_username
from allauth.socialaccount.adapter import (
DefaultSocialAccountAdapter, get_account_adapter, get_adapter,
)
from allauth.socialaccount.models import SocialAccount
2018-04-22 20:13:42 +02:00
import ldap
User = get_user_model()
2018-04-28 16:26:52 +02:00
DEPARTMENTS_LIST = {
'phy': u'Physique',
'maths': u'Maths',
'bio': u'Biologie',
'chimie': u'Chimie',
'geol': u'Géosciences',
'dec': u'DEC',
'info': u'Informatique',
'litt': u'Littéraire',
'guests': u'Pensionnaires étrangers',
'pei': u'PEI',
}
2018-04-22 20:13:42 +02:00
def _init_ldap():
server = getattr(settings, "LDAP_SERVER", "ldaps://ldap.spi.ens.fr:636")
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,
ldap.OPT_X_TLS_NEVER)
l = ldap.initialize(server)
l.set_option(ldap.OPT_REFERRALS, 0)
l.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
l.set_option(ldap.OPT_X_TLS_DEMAND, True)
l.set_option(ldap.OPT_DEBUG_LEVEL, 255)
l.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
l.set_option(ldap.OPT_TIMEOUT, 10)
return l
def _extract_infos_from_ldap(infos, data={}):
# Name
2018-04-28 16:26:52 +02:00
data['name'] = infos.get('cn', [b''])[0].decode("utf-8")
# Parsing homeDirectory to get entrance year and departments
annee = '00'
promotion = 'Inconnue'
if 'homeDirectory' in infos:
dirs = infos['homeDirectory'][0].decode("utf-8").split('/')
2018-04-28 16:26:52 +02:00
if len(dirs) >= 4 and dirs[1] == 'users':
# Assume template "/users/<year>/<department>/clipper/"
annee = dirs[2]
dep = dirs[3]
2018-04-28 16:26:52 +02:00
dep = DEPARTMENTS_LIST.get(dep.lower(), '')
promotion = u'%s %s' % (dep, annee)
data['annee'] = annee
data['promotion'] = promotion
# Mail
pmail = infos.get('mailRoutingAddress', [])
2018-04-28 16:26:52 +02:00
if pmail:
data['email'] = pmail[0].decode("utf-8")
return data
def get_ldap_infos(clipper):
assert clipper.isalnum()
data = {}
try:
2018-04-22 20:13:42 +02:00
l = _init_ldap()
info = l.search_s('dc=spi,dc=ens,dc=fr',
ldap.SCOPE_SUBTREE,
('(uid=%s)' % (clipper,)),
2018-04-28 16:26:52 +02:00
['cn',
'mailRoutingAddress',
'homeDirectory'])
if len(info) > 0:
data = _extract_infos_from_ldap(info[0][1], data)
except ldap.LDAPError:
pass
return data
class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter):
"""
A class to manage the fact that people loose their account at the end of
their scolarity and that their clipper login might be reused later
"""
def pre_social_login(self, request, sociallogin):
if sociallogin.account.provider != "clipper":
return super(LongTermClipperAccountAdapter,
self).pre_social_login(request, sociallogin)
clipper = sociallogin.account.uid
try:
2018-04-23 00:01:27 +02:00
a = SocialAccount.objects.get(provider='clipper_inactive',
uid=clipper)
except SocialAccount.DoesNotExist:
return
2018-04-28 16:26:52 +02:00
# An account with that uid was registered, but potentially
# deprecated at the beginning of the year
# We need to check that the user is still the same as before
ldap_data = get_ldap_infos(clipper)
sociallogin._ldap_data = ldap_data
if a.user.username != self.get_username(clipper, ldap_data):
# The admission year is different
# We need a new SocialAccount
# But before that, we need to invalidate the email address of
# the previous user
email = ldap_data.get('email', '{}@clipper.ens.fr'.format(
clipper.strip().lower()))
2018-04-28 16:26:52 +02:00
u_mails = EmailAddress.objects.filter(user=a.user)
try:
clipper_mail = u_mails.get(email=email)
if clipper_mail.primary:
n_mails = u_mails.filter(primary=False)
if n_mails.exists():
n_mails[0].set_as_primary()
else:
user_email(a.user, '')
a.user.save()
clipper_mail.delete()
except EmailAddress.DoesNotExist:
pass
return
# The admission year is the same, we can update the model and keep
# the previous SocialAccount instance
a.provider = 'clipper'
a.save()
# Redo the thing that had failed just before
sociallogin.lookup()
def get_username(self, clipper, data):
"""
Util function to generate a unique username, by default 'clipper@promo'
This is used to disambiguate and recognize if the person is the same
"""
if data is None or 'annee' not in data:
raise ValueError("No entrance year in LDAP data")
return "{}@{}".format(clipper, data.get('annee', '00'))
def save_user(self, request, sociallogin, form=None):
if sociallogin.account.provider != "clipper":
return super(LongTermClipperAccountAdapter,
self).save_user(request, sociallogin, form)
user = sociallogin.user
user.set_unusable_password()
clipper = sociallogin.account.uid
ldap_data = sociallogin._ldap_data if hasattr(sociallogin,
'_ldap_data') \
else get_ldap_infos(clipper)
username = self.get_username(clipper, ldap_data)
email = ldap_data.get('email', '{}@clipper.ens.fr'.format(
clipper.strip().lower()))
name = ldap_data.get('name')
user_username(user, username or '')
user_email(user, email or '')
name_parts = (name or '').split(' ')
user_field(user, 'first_name', name_parts[0])
user_field(user, 'last_name', ' '.join(name_parts[1:]))
# Ignore form
get_account_adapter().populate_username(request, user)
# Save extra data (only once)
sociallogin.account.extra_data = sociallogin.extra_data = ldap_data
sociallogin.save(request)
sociallogin.account.save()
return user
def deprecate_clippers():
"""
Marks all the SocialAccount with clipper as deprecated, by setting their
provider to 'clipper_inactive'
"""
clippers = SocialAccount.objects.filter(provider='clipper')
c_uids = clippers.values_list('uid', flat=True)
# Clear old clipper accounts that were replaced by new ones
# (to avoid conflicts)
SocialAccount.objects.filter(provider='clipper_inactive',
uid__in=c_uids).delete()
# Deprecate accounts
2018-04-22 20:13:42 +02:00
clippers.update(provider='clipper_inactive')
def install_longterm_adapter(fake=False):
"""
Manages the transition from an older django_cas or an allauth_ens
installation without LongTermClipperAccountAdapter
"""
accounts = {u.username: u for u in User.objects.all()
if u.username.isalnum()}
l = _init_ldap()
ltc_adapter = get_adapter()
info = l.search_s('dc=spi,dc=ens,dc=fr',
ldap.SCOPE_SUBTREE,
("(|{})".format(''.join(("(uid=%s)" % (un,))
for un in accounts.keys()))),
['uid',
'cn',
'mailRoutingAddress',
'homeDirectory'])
logs = {"created": [], "updated": []}
cases = []
for userinfo in info:
infos = userinfo[1]
data = _extract_infos_from_ldap(infos)
clipper = infos["uid"][0]
user = accounts.get(clipper, None)
if user is None:
continue
user.username = ltc_adapter.get_username(clipper, data)
if fake:
cases.append(clipper)
else:
user.save()
cases.append(user.username)
if SocialAccount.objects.filter(provider='clipper',
uid=clipper).exists():
logs["updated"].append((clipper, user.username))
continue
sa = SocialAccount(user=user, provider='clipper',
uid=clipper, extra_data=data)
if not fake:
sa.save()
logs["created"].append((clipper, user.username))
logs["unmodified"] = User.objects.exclude(username__in=cases)\
.values_list("username", flat=True)
return logs