More readable and organized code

Working from Aurelien's code reviews
This commit is contained in:
Evarin 2018-06-03 22:10:34 +02:00
parent 787efe96d0
commit 17fef409a8
4 changed files with 200 additions and 135 deletions

View file

@ -1,9 +1,7 @@
# -*- coding: utf-8 -*-
from django.conf import settings
from django.contrib.auth import get_user_model
from allauth.account.models import EmailAddress
from allauth.account.utils import user_email, user_field, user_username
from allauth.socialaccount.adapter import (
DefaultSocialAccountAdapter, get_account_adapter, get_adapter,
@ -12,85 +10,10 @@ from allauth.socialaccount.models import SocialAccount
import ldap
from .utils import extract_infos_from_ldap, get_ldap_infos, get_clipper_email, remove_email, init_ldap
User = get_user_model()
DEPARTMENTS_LIST = {
'phy': u'Physique',
'maths': u'Maths',
'bio': u'Biologie',
'chimie': u'Chimie',
'geol': u'Géosciences',
'dec': u'DEC',
'info': u'Informatique',
'litt': u'Littéraire',
'guests': u'Pensionnaires étrangers',
'pei': u'PEI',
}
def _init_ldap():
server = getattr(settings, "LDAP_SERVER", "ldaps://ldap.spi.ens.fr:636")
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,
ldap.OPT_X_TLS_NEVER)
l = ldap.initialize(server)
l.set_option(ldap.OPT_REFERRALS, 0)
l.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
l.set_option(ldap.OPT_X_TLS_DEMAND, True)
l.set_option(ldap.OPT_DEBUG_LEVEL, 255)
l.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
l.set_option(ldap.OPT_TIMEOUT, 10)
return l
def _extract_infos_from_ldap(infos, data={}):
# Name
data['name'] = infos.get('cn', [b''])[0].decode("utf-8")
# Parsing homeDirectory to get entrance year and departments
annee = '00'
promotion = 'Inconnue'
if 'homeDirectory' in infos:
dirs = infos['homeDirectory'][0].decode("utf-8").split('/')
if len(dirs) >= 4 and dirs[1] == 'users':
# Assume template "/users/<year>/<department>/clipper/"
annee = dirs[2]
dep = dirs[3]
dep = DEPARTMENTS_LIST.get(dep.lower(), '')
promotion = u'%s %s' % (dep, annee)
data['annee'] = annee
data['promotion'] = promotion
# Mail
pmail = infos.get('mailRoutingAddress', [])
if pmail:
data['email'] = pmail[0].decode("utf-8")
return data
def get_ldap_infos(clipper):
assert clipper.isalnum()
data = {}
try:
l = _init_ldap()
info = l.search_s('dc=spi,dc=ens,dc=fr',
ldap.SCOPE_SUBTREE,
('(uid=%s)' % (clipper,)),
['cn',
'mailRoutingAddress',
'homeDirectory'])
if len(info) > 0:
data = _extract_infos_from_ldap(info[0][1], data)
except ldap.LDAPError:
pass
return data
class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter):
"""
A class to manage the fact that people loose their account at the end of
@ -98,61 +21,63 @@ class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter):
"""
def pre_social_login(self, request, sociallogin):
"""
If a clipper connection has already existed with the uid, it checks
that this connection still belongs to the user it was associated with.
This check is performed by comparing the generated username corresponding
to this connection with the old one.
If the check succeeds, it simply reactivates the clipper connection as
belonging to the associated user.
If the check fails, it frees the elements (as the clipper email
address) which will be assigned to the new connection later.
"""
if sociallogin.account.provider != "clipper":
return super(LongTermClipperAccountAdapter,
self).pre_social_login(request, sociallogin)
clipper = sociallogin.account.uid
clipper_uid = sociallogin.account.uid
try:
a = SocialAccount.objects.get(provider='clipper_inactive',
uid=clipper)
old_conn = SocialAccount.objects.get(provider='clipper_inactive',
uid=clipper_uid)
except SocialAccount.DoesNotExist:
return
# An account with that uid was registered, but potentially
# deprecated at the beginning of the year
# We need to check that the user is still the same as before
ldap_data = get_ldap_infos(clipper)
ldap_data = get_ldap_infos(clipper_uid)
sociallogin._ldap_data = ldap_data
if a.user.username != self.get_username(clipper, ldap_data):
if old_conn.user.username != self.get_username(clipper_uid, ldap_data):
# The admission year is different
# We need a new SocialAccount
# But before that, we need to invalidate the email address of
# the previous user
email = ldap_data.get('email', '{}@clipper.ens.fr'.format(
clipper.strip().lower()))
u_mails = EmailAddress.objects.filter(user=a.user)
try:
clipper_mail = u_mails.get(email=email)
if clipper_mail.primary:
n_mails = u_mails.filter(primary=False)
if n_mails.exists():
n_mails[0].set_as_primary()
else:
user_email(a.user, '')
a.user.save()
clipper_mail.delete()
except EmailAddress.DoesNotExist:
pass
# We cannot reuse this SocialAccount, so we need to invalidate
# the email address of the previous user to prevent conflicts
# if a new SocialAccount is created
email = ldap_data.get('email', get_clipper_email(clipper_uid))
remove_email(old_conn.user, email)
return
# The admission year is the same, we can update the model and keep
# the previous SocialAccount instance
a.provider = 'clipper'
a.save()
old_conn.provider = 'clipper'
old_conn.save()
# Redo the thing that had failed just before
sociallogin.lookup()
def get_username(self, clipper, data):
def get_username(self, clipper_uid, data):
"""
Util function to generate a unique username, by default 'clipper@promo'
This is used to disambiguate and recognize if the person is the same
"""
if data is None or 'annee' not in data:
if data is None or 'entrance_year' not in data:
raise ValueError("No entrance year in LDAP data")
return "{}@{}".format(clipper, data.get('annee', '00'))
return "{}@{}".format(clipper_uid, data['entrance_year'])
def save_user(self, request, sociallogin, form=None):
if sociallogin.account.provider != "clipper":
@ -161,14 +86,13 @@ class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter):
user = sociallogin.user
user.set_unusable_password()
clipper = sociallogin.account.uid
clipper_uid = sociallogin.account.uid
ldap_data = sociallogin._ldap_data if hasattr(sociallogin,
'_ldap_data') \
else get_ldap_infos(clipper)
else get_ldap_infos(clipper_uid)
username = self.get_username(clipper, ldap_data)
email = ldap_data.get('email', '{}@clipper.ens.fr'.format(
clipper.strip().lower()))
username = self.get_username(clipper_uid, ldap_data)
email = ldap_data.get('email', get_clipper_email(clipper_uid))
name = ldap_data.get('name')
user_username(user, username or '')
user_email(user, email or '')
@ -213,7 +137,7 @@ def install_longterm_adapter(fake=False):
accounts = {u.username: u for u in User.objects.all()
if u.username.isalnum()}
l = _init_ldap()
l = init_ldap()
ltc_adapter = get_adapter()
info = l.search_s('dc=spi,dc=ens,dc=fr',
@ -230,26 +154,26 @@ def install_longterm_adapter(fake=False):
for userinfo in info:
infos = userinfo[1]
data = _extract_infos_from_ldap(infos)
clipper = infos["uid"][0]
user = accounts.get(clipper, None)
data = extract_infos_from_ldap(infos)
clipper_uid = data['clipper_uid']
user = accounts.get(clipper_uid, None)
if user is None:
continue
user.username = ltc_adapter.get_username(clipper, data)
user.username = ltc_adapter.get_username(clipper_uid, data)
if fake:
cases.append(clipper)
cases.append(clipper_uid)
else:
user.save()
cases.append(user.username)
if SocialAccount.objects.filter(provider='clipper',
uid=clipper).exists():
logs["updated"].append((clipper, user.username))
uid=clipper_uid).exists():
logs["updated"].append((clipper_uid, user.username))
continue
sa = SocialAccount(user=user, provider='clipper',
uid=clipper, extra_data=data)
uid=clipper_uid, extra_data=data)
if not fake:
sa.save()
logs["created"].append((clipper, user.username))
logs["created"].append((clipper_uid, user.username))
logs["unmodified"] = User.objects.exclude(username__in=cases)\
.values_list("username", flat=True)

View file

@ -15,7 +15,7 @@ from mock import patch
from .adapter import deprecate_clippers, install_longterm_adapter
_mock_ldap = MockLDAP()
ldap_patcher = patch('allauth_ens.adapter.ldap.initialize',
ldap_patcher = patch('allauth_ens.utils.ldap.initialize',
lambda x: _mock_ldap)
if django.VERSION >= (1, 10):
@ -341,7 +341,7 @@ class LongTermClipperTests(CASTestCase):
nsa0 = SocialAccount.objects.count()
ldap_patcher.stop()
with self.settings(LDAP_SERVER=''):
with self.settings(CLIPPER_LDAP_SERVER=''):
self.assertRaises(ValueError, self.client_cas_login,
self.client, provider_id="clipper",
username="test")

121
allauth_ens/utils.py Normal file
View file

@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
from django.conf import settings
from allauth.account.models import EmailAddress
from allauth.account.utils import user_email
import ldap
DEPARTMENTS_LIST = {
'phy': u'Physique',
'maths': u'Maths',
'bio': u'Biologie',
'chimie': u'Chimie',
'geol': u'Géosciences',
'dec': u'DEC',
'info': u'Informatique',
'litt': u'Littéraire',
'guests': u'Pensionnaires étrangers',
'pei': u'PEI',
}
def init_ldap():
server = getattr(settings, "CLIPPER_LDAP_SERVER", "ldaps://ldap.spi.ens.fr:636")
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,
ldap.OPT_X_TLS_NEVER)
l = ldap.initialize(server)
l.set_option(ldap.OPT_REFERRALS, 0)
l.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
l.set_option(ldap.OPT_X_TLS_DEMAND, True)
l.set_option(ldap.OPT_DEBUG_LEVEL, 255)
l.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
l.set_option(ldap.OPT_TIMEOUT, 10)
return l
def extract_infos_from_ldap(infos):
data = {}
# Name
if 'cn' in infos:
data['name'] = infos['cn'][0].decode("utf-8")
# Parsing homeDirectory to get entrance year and departments
if 'homeDirectory' in infos:
dirs = infos['homeDirectory'][0].decode("utf-8").split('/')
if len(dirs) >= 4 and dirs[1] == 'users':
# Assume template "/users/<year>/<department>/clipper/"
annee = dirs[2]
dep = dirs[3]
dep_fancy = DEPARTMENTS_LIST.get(dep.lower(), '')
promotion = u'%s %s' % (dep, annee)
data['entrance_year'] = annee
data['department_code'] = dep
data['department'] = dep_fancy
# Mail
pmail = infos.get('mailRoutingAddress', [])
if pmail:
data['email'] = pmail[0].decode("utf-8")
# User id
if 'uid' in infos:
data['clipper_uid'] = infos['uid'][0].decode("utf-8").strip().lower()
return data
def get_ldap_infos(clipper_uid):
assert clipper_uid.isalnum()
data = {}
try:
l = init_ldap()
info = l.search_s('dc=spi,dc=ens,dc=fr',
ldap.SCOPE_SUBTREE,
('(uid=%s)' % (clipper_uid,)),
['cn',
'mailRoutingAddress',
'homeDirectory'])
if len(info) > 0:
data = extract_infos_from_ldap(info[0][1])
except ldap.LDAPError:
pass
return data
def get_clipper_email(clipper):
return '{}@clipper.ens.fr'.format(clipper.strip().lower())
def remove_email(user, email):
"""
Removes an email address of a user.
If it is his primary email address, it sets another email address as
primary, preferably verified.
"""
u_mailaddrs = user.emailaddress_set.filter(user=user)
try:
mailaddr = user.emailaddress_set.get(email=email)
except EmailAddress.DoesNotExist:
return
if mailaddr.primary:
others = u_mailaddrs.filter(primary=False)
# Prefer a verified mail.
new_primary = (
others.filter(verified=True).last() or
others.last()
)
if new_primary:
# It also updates 'user.EMAIL_FIELD'.
new_primary.set_as_primary()
else:
user_email(user, '')
user.save()
mailaddr.delete()