From 17fef409a8aed1ffd8e479779b022281d665943d Mon Sep 17 00:00:00 2001 From: Evarin Date: Sun, 3 Jun 2018 22:10:34 +0200 Subject: [PATCH] More readable and organized code Working from Aurelien's code reviews --- README.rst | 42 ++++++++--- allauth_ens/adapter.py | 168 +++++++++++------------------------------ allauth_ens/tests.py | 4 +- allauth_ens/utils.py | 121 +++++++++++++++++++++++++++++ 4 files changed, 200 insertions(+), 135 deletions(-) create mode 100644 allauth_ens/utils.py diff --git a/README.rst b/README.rst index 5b30afa..d69d2b7 100644 --- a/README.rst +++ b/README.rst @@ -148,14 +148,11 @@ Configuration SOCIALACCOUNT_PROVIDERS = { # … - 'clipper': { - # These settings control whether a message containing a link to # disconnect from the CAS server is added when users log out. 'MESSAGE_SUGGEST_LOGOUT_ON_LOGOUT': True, 'MESSAGE_SUGGEST_LOGOUT_ON_LOGOUT_LEVEL': messages.INFO, - }, } @@ -164,33 +161,56 @@ Auto-signup - username: ```` - email (primary and verified): ``@clipper.ens.fr`` +******** +Adapters +******** Long Term Clipper Adapter ========================= -We provide an easy-to-use SocialAccountAdapter to handle the fact that Clipper Accounts are not eternal, and that there is no guarantee that the clipper usernames won't be reused later. +We provide an easy-to-use SocialAccountAdapter to handle the fact that Clipper +accounts are not eternal, and that there is no guarantee that the clipper +usernames won't be reused later. -This adapter also handles getting basic information about the user from SPI's LDAP. +This adapter also handles getting basic information about the user from SPI's +LDAP. Configuration - Set ``SOCIALACCOUNT_ADAPTER='allauth_ens.adapter.LongTermClipperAccountAdapter'`` in `settings.py` + Set ``SOCIALACCOUNT_ADAPTER='allauth_ens.adapter.LongTermClipperAccountAdapter'`` + in `settings.py` Auto-signup Populated data - username: ``@`` - email: from LDAP's *mailRoutingAddress* field, or ``@clipper.ens.fr`` - first_name, last_name from LDAP's *cn* field - - extra_data in SociallAccount instance, containing these field, plus *annee* and *promotion* parsed from LDAP's *homeDirectory* field (available only on first connection) + - extra_data in SocialAccount instance, containing these field, plus *annee* + and *promotion* parsed from LDAP's *homeDirectory* field (available only on + first connection) Account deprecation - At the beginning of each year (i.e. early November), to prevent clipper username conflicts, you should run ``$ python manage.py deprecate_clippers``. Every association clipper username <-> user will then be put on hold, and at the first subsequent connection, a verification of the account will be made (using LDAP), so that a known user keeps his account, but a newcomer won't inherit an archicube's. + At the beginning of each year (i.e. early November), to prevent clipper + username conflicts, you should run ``$ python manage.py deprecate_clippers``. + Every association clipper username <-> user will then be put on hold, and at + the first subsequent connection, a verification of the account will be made + (using LDAP), so that a known user keeps his account, but a newcomer won't + inherit an archicube's. Customize - You can customize the SocialAccountAdapter by inheriting ``allauth_ens.adapter.LongTermClipperAccountAdapter``. You might want to modify ``get_username(clipper, data)`` to change the default username format. This function is used to disambiguate in the account deprecation process. - By default, ``get_username`` raises a ``ValueError`` when the connexion to the LDAP failed or did not allow to retrieve the user's entrance year. Overriding ``get_username`` (as done in the example website) allows to get rid of that behaviour, and for instance attribute a default entrance year. + You can customize the SocialAccountAdapter by inheriting + ``allauth_ens.adapter.LongTermClipperAccountAdapter``. You might want to + modify ``get_username(clipper, data)`` to change the default username format. + This function is used to disambiguate in the account deprecation process. + By default, ``get_username`` raises a ``ValueError`` when the connexion to the + LDAP failed or did not allow to retrieve the user's entrance year. Overriding + ``get_username`` (as done in the example website) allows to get rid of that + behaviour, and for instance attribute a default entrance year. Initial migration - If you used allauth without LongTermClipperAccountAdapter, or another CAS interface to log in, you need to update the Users to the new username policy, and (in the second case) to create the SocialAccount instances to link CAS and Users. This can be done easily with ``$ python manage.py install_longterm``. + If you used allauth without LongTermClipperAccountAdapter, or another CAS + interface to log in, you need to update the Users to the new username policy, + and (in the second case) to create the SocialAccount instances to link CAS and + Users. This can be done easily with ``$ python manage.py install_longterm``. ********* Demo Site diff --git a/allauth_ens/adapter.py b/allauth_ens/adapter.py index dadbd2a..50c5d33 100644 --- a/allauth_ens/adapter.py +++ b/allauth_ens/adapter.py @@ -1,9 +1,7 @@ # -*- coding: utf-8 -*- -from django.conf import settings from django.contrib.auth import get_user_model -from allauth.account.models import EmailAddress from allauth.account.utils import user_email, user_field, user_username from allauth.socialaccount.adapter import ( DefaultSocialAccountAdapter, get_account_adapter, get_adapter, @@ -12,85 +10,10 @@ from allauth.socialaccount.models import SocialAccount import ldap +from .utils import extract_infos_from_ldap, get_ldap_infos, get_clipper_email, remove_email, init_ldap + User = get_user_model() -DEPARTMENTS_LIST = { - 'phy': u'Physique', - 'maths': u'Maths', - 'bio': u'Biologie', - 'chimie': u'Chimie', - 'geol': u'Géosciences', - 'dec': u'DEC', - 'info': u'Informatique', - 'litt': u'Littéraire', - 'guests': u'Pensionnaires étrangers', - 'pei': u'PEI', -} - - -def _init_ldap(): - server = getattr(settings, "LDAP_SERVER", "ldaps://ldap.spi.ens.fr:636") - ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, - ldap.OPT_X_TLS_NEVER) - l = ldap.initialize(server) - l.set_option(ldap.OPT_REFERRALS, 0) - l.set_option(ldap.OPT_PROTOCOL_VERSION, 3) - l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND) - l.set_option(ldap.OPT_X_TLS_DEMAND, True) - l.set_option(ldap.OPT_DEBUG_LEVEL, 255) - l.set_option(ldap.OPT_NETWORK_TIMEOUT, 10) - l.set_option(ldap.OPT_TIMEOUT, 10) - return l - - -def _extract_infos_from_ldap(infos, data={}): - # Name - data['name'] = infos.get('cn', [b''])[0].decode("utf-8") - - # Parsing homeDirectory to get entrance year and departments - annee = '00' - promotion = 'Inconnue' - - if 'homeDirectory' in infos: - dirs = infos['homeDirectory'][0].decode("utf-8").split('/') - if len(dirs) >= 4 and dirs[1] == 'users': - # Assume template "/users///clipper/" - annee = dirs[2] - dep = dirs[3] - dep = DEPARTMENTS_LIST.get(dep.lower(), '') - promotion = u'%s %s' % (dep, annee) - data['annee'] = annee - data['promotion'] = promotion - - # Mail - pmail = infos.get('mailRoutingAddress', []) - if pmail: - data['email'] = pmail[0].decode("utf-8") - return data - - -def get_ldap_infos(clipper): - assert clipper.isalnum() - data = {} - try: - l = _init_ldap() - - info = l.search_s('dc=spi,dc=ens,dc=fr', - ldap.SCOPE_SUBTREE, - ('(uid=%s)' % (clipper,)), - ['cn', - 'mailRoutingAddress', - 'homeDirectory']) - - if len(info) > 0: - data = _extract_infos_from_ldap(info[0][1], data) - - except ldap.LDAPError: - pass - - return data - - class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter): """ A class to manage the fact that people loose their account at the end of @@ -98,61 +21,63 @@ class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter): """ def pre_social_login(self, request, sociallogin): + """ + If a clipper connection has already existed with the uid, it checks + that this connection still belongs to the user it was associated with. + + This check is performed by comparing the generated username corresponding + to this connection with the old one. + + If the check succeeds, it simply reactivates the clipper connection as + belonging to the associated user. + + If the check fails, it frees the elements (as the clipper email + address) which will be assigned to the new connection later. + """ + if sociallogin.account.provider != "clipper": return super(LongTermClipperAccountAdapter, self).pre_social_login(request, sociallogin) - clipper = sociallogin.account.uid + clipper_uid = sociallogin.account.uid try: - a = SocialAccount.objects.get(provider='clipper_inactive', - uid=clipper) + old_conn = SocialAccount.objects.get(provider='clipper_inactive', + uid=clipper_uid) except SocialAccount.DoesNotExist: return # An account with that uid was registered, but potentially # deprecated at the beginning of the year # We need to check that the user is still the same as before - ldap_data = get_ldap_infos(clipper) + ldap_data = get_ldap_infos(clipper_uid) sociallogin._ldap_data = ldap_data - if a.user.username != self.get_username(clipper, ldap_data): + if old_conn.user.username != self.get_username(clipper_uid, ldap_data): # The admission year is different - # We need a new SocialAccount - # But before that, we need to invalidate the email address of - # the previous user - email = ldap_data.get('email', '{}@clipper.ens.fr'.format( - clipper.strip().lower())) - u_mails = EmailAddress.objects.filter(user=a.user) - try: - clipper_mail = u_mails.get(email=email) - if clipper_mail.primary: - n_mails = u_mails.filter(primary=False) - if n_mails.exists(): - n_mails[0].set_as_primary() - else: - user_email(a.user, '') - a.user.save() - clipper_mail.delete() - except EmailAddress.DoesNotExist: - pass + # We cannot reuse this SocialAccount, so we need to invalidate + # the email address of the previous user to prevent conflicts + # if a new SocialAccount is created + email = ldap_data.get('email', get_clipper_email(clipper_uid)) + remove_email(old_conn.user, email) + return # The admission year is the same, we can update the model and keep # the previous SocialAccount instance - a.provider = 'clipper' - a.save() + old_conn.provider = 'clipper' + old_conn.save() # Redo the thing that had failed just before sociallogin.lookup() - def get_username(self, clipper, data): + def get_username(self, clipper_uid, data): """ Util function to generate a unique username, by default 'clipper@promo' This is used to disambiguate and recognize if the person is the same """ - if data is None or 'annee' not in data: + if data is None or 'entrance_year' not in data: raise ValueError("No entrance year in LDAP data") - return "{}@{}".format(clipper, data.get('annee', '00')) + return "{}@{}".format(clipper_uid, data['entrance_year']) def save_user(self, request, sociallogin, form=None): if sociallogin.account.provider != "clipper": @@ -161,14 +86,13 @@ class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter): user = sociallogin.user user.set_unusable_password() - clipper = sociallogin.account.uid + clipper_uid = sociallogin.account.uid ldap_data = sociallogin._ldap_data if hasattr(sociallogin, '_ldap_data') \ - else get_ldap_infos(clipper) + else get_ldap_infos(clipper_uid) - username = self.get_username(clipper, ldap_data) - email = ldap_data.get('email', '{}@clipper.ens.fr'.format( - clipper.strip().lower())) + username = self.get_username(clipper_uid, ldap_data) + email = ldap_data.get('email', get_clipper_email(clipper_uid)) name = ldap_data.get('name') user_username(user, username or '') user_email(user, email or '') @@ -213,7 +137,7 @@ def install_longterm_adapter(fake=False): accounts = {u.username: u for u in User.objects.all() if u.username.isalnum()} - l = _init_ldap() + l = init_ldap() ltc_adapter = get_adapter() info = l.search_s('dc=spi,dc=ens,dc=fr', @@ -230,26 +154,26 @@ def install_longterm_adapter(fake=False): for userinfo in info: infos = userinfo[1] - data = _extract_infos_from_ldap(infos) - clipper = infos["uid"][0] - user = accounts.get(clipper, None) + data = extract_infos_from_ldap(infos) + clipper_uid = data['clipper_uid'] + user = accounts.get(clipper_uid, None) if user is None: continue - user.username = ltc_adapter.get_username(clipper, data) + user.username = ltc_adapter.get_username(clipper_uid, data) if fake: - cases.append(clipper) + cases.append(clipper_uid) else: user.save() cases.append(user.username) if SocialAccount.objects.filter(provider='clipper', - uid=clipper).exists(): - logs["updated"].append((clipper, user.username)) + uid=clipper_uid).exists(): + logs["updated"].append((clipper_uid, user.username)) continue sa = SocialAccount(user=user, provider='clipper', - uid=clipper, extra_data=data) + uid=clipper_uid, extra_data=data) if not fake: sa.save() - logs["created"].append((clipper, user.username)) + logs["created"].append((clipper_uid, user.username)) logs["unmodified"] = User.objects.exclude(username__in=cases)\ .values_list("username", flat=True) diff --git a/allauth_ens/tests.py b/allauth_ens/tests.py index eb276c6..d9c01e5 100644 --- a/allauth_ens/tests.py +++ b/allauth_ens/tests.py @@ -15,7 +15,7 @@ from mock import patch from .adapter import deprecate_clippers, install_longterm_adapter _mock_ldap = MockLDAP() -ldap_patcher = patch('allauth_ens.adapter.ldap.initialize', +ldap_patcher = patch('allauth_ens.utils.ldap.initialize', lambda x: _mock_ldap) if django.VERSION >= (1, 10): @@ -341,7 +341,7 @@ class LongTermClipperTests(CASTestCase): nsa0 = SocialAccount.objects.count() ldap_patcher.stop() - with self.settings(LDAP_SERVER=''): + with self.settings(CLIPPER_LDAP_SERVER=''): self.assertRaises(ValueError, self.client_cas_login, self.client, provider_id="clipper", username="test") diff --git a/allauth_ens/utils.py b/allauth_ens/utils.py new file mode 100644 index 0000000..57df7b1 --- /dev/null +++ b/allauth_ens/utils.py @@ -0,0 +1,121 @@ +# -*- coding: utf-8 -*- + +from django.conf import settings + +from allauth.account.models import EmailAddress +from allauth.account.utils import user_email + +import ldap + +DEPARTMENTS_LIST = { + 'phy': u'Physique', + 'maths': u'Maths', + 'bio': u'Biologie', + 'chimie': u'Chimie', + 'geol': u'Géosciences', + 'dec': u'DEC', + 'info': u'Informatique', + 'litt': u'Littéraire', + 'guests': u'Pensionnaires étrangers', + 'pei': u'PEI', +} + +def init_ldap(): + server = getattr(settings, "CLIPPER_LDAP_SERVER", "ldaps://ldap.spi.ens.fr:636") + ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, + ldap.OPT_X_TLS_NEVER) + l = ldap.initialize(server) + l.set_option(ldap.OPT_REFERRALS, 0) + l.set_option(ldap.OPT_PROTOCOL_VERSION, 3) + l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND) + l.set_option(ldap.OPT_X_TLS_DEMAND, True) + l.set_option(ldap.OPT_DEBUG_LEVEL, 255) + l.set_option(ldap.OPT_NETWORK_TIMEOUT, 10) + l.set_option(ldap.OPT_TIMEOUT, 10) + return l + +def extract_infos_from_ldap(infos): + data = {} + + # Name + if 'cn' in infos: + data['name'] = infos['cn'][0].decode("utf-8") + + # Parsing homeDirectory to get entrance year and departments + if 'homeDirectory' in infos: + dirs = infos['homeDirectory'][0].decode("utf-8").split('/') + if len(dirs) >= 4 and dirs[1] == 'users': + # Assume template "/users///clipper/" + annee = dirs[2] + dep = dirs[3] + dep_fancy = DEPARTMENTS_LIST.get(dep.lower(), '') + promotion = u'%s %s' % (dep, annee) + data['entrance_year'] = annee + data['department_code'] = dep + data['department'] = dep_fancy + + # Mail + pmail = infos.get('mailRoutingAddress', []) + if pmail: + data['email'] = pmail[0].decode("utf-8") + + # User id + if 'uid' in infos: + data['clipper_uid'] = infos['uid'][0].decode("utf-8").strip().lower() + + return data + +def get_ldap_infos(clipper_uid): + assert clipper_uid.isalnum() + data = {} + try: + l = init_ldap() + + info = l.search_s('dc=spi,dc=ens,dc=fr', + ldap.SCOPE_SUBTREE, + ('(uid=%s)' % (clipper_uid,)), + ['cn', + 'mailRoutingAddress', + 'homeDirectory']) + + if len(info) > 0: + data = extract_infos_from_ldap(info[0][1]) + + except ldap.LDAPError: + pass + + return data + +def get_clipper_email(clipper): + return '{}@clipper.ens.fr'.format(clipper.strip().lower()) + +def remove_email(user, email): + """ + Removes an email address of a user. + + If it is his primary email address, it sets another email address as + primary, preferably verified. + """ + u_mailaddrs = user.emailaddress_set.filter(user=user) + try: + mailaddr = user.emailaddress_set.get(email=email) + except EmailAddress.DoesNotExist: + return + + if mailaddr.primary: + others = u_mailaddrs.filter(primary=False) + + # Prefer a verified mail. + new_primary = ( + others.filter(verified=True).last() or + others.last() + ) + + if new_primary: + # It also updates 'user.EMAIL_FIELD'. + new_primary.set_as_primary() + else: + user_email(user, '') + user.save() + + mailaddr.delete()