diff --git a/README.rst b/README.rst index 2a35879..5b30afa 100644 --- a/README.rst +++ b/README.rst @@ -187,6 +187,7 @@ Account deprecation Customize You can customize the SocialAccountAdapter by inheriting ``allauth_ens.adapter.LongTermClipperAccountAdapter``. You might want to modify ``get_username(clipper, data)`` to change the default username format. This function is used to disambiguate in the account deprecation process. + By default, ``get_username`` raises a ``ValueError`` when the connexion to the LDAP failed or did not allow to retrieve the user's entrance year. Overriding ``get_username`` (as done in the example website) allows to get rid of that behaviour, and for instance attribute a default entrance year. Initial migration If you used allauth without LongTermClipperAccountAdapter, or another CAS interface to log in, you need to update the Users to the new username policy, and (in the second case) to create the SocialAccount instances to link CAS and Users. This can be done easily with ``$ python manage.py install_longterm``. diff --git a/allauth_ens/adapter.py b/allauth_ens/adapter.py index e0cc484..dadbd2a 100644 --- a/allauth_ens/adapter.py +++ b/allauth_ens/adapter.py @@ -1,16 +1,18 @@ # -*- coding: utf-8 -*- -import ldap -from allauth.account.utils import user_email, user_field, user_username -from allauth.account.models import EmailAddress -from allauth.socialaccount.adapter import DefaultSocialAccountAdapter, get_account_adapter, get_adapter -from allauth.socialaccount.models import SocialAccount from django.conf import settings from django.contrib.auth import get_user_model -User = get_user_model() +from allauth.account.models import EmailAddress +from allauth.account.utils import user_email, user_field, user_username +from allauth.socialaccount.adapter import ( + DefaultSocialAccountAdapter, get_account_adapter, get_adapter, +) +from allauth.socialaccount.models import SocialAccount -import six +import ldap + +User = get_user_model() DEPARTMENTS_LIST = { 'phy': u'Physique', @@ -25,6 +27,7 @@ DEPARTMENTS_LIST = { 'pei': u'PEI', } + def _init_ldap(): server = getattr(settings, "LDAP_SERVER", "ldaps://ldap.spi.ens.fr:636") ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, @@ -39,6 +42,7 @@ def _init_ldap(): l.set_option(ldap.OPT_TIMEOUT, 10) return l + def _extract_infos_from_ldap(infos, data={}): # Name data['name'] = infos.get('cn', [b''])[0].decode("utf-8") @@ -48,7 +52,7 @@ def _extract_infos_from_ldap(infos, data={}): promotion = 'Inconnue' if 'homeDirectory' in infos: - dirs = infos['homeDirectory'][0].split('/') + dirs = infos['homeDirectory'][0].decode("utf-8").split('/') if len(dirs) >= 4 and dirs[1] == 'users': # Assume template "/users///clipper/" annee = dirs[2] @@ -61,12 +65,13 @@ def _extract_infos_from_ldap(infos, data={}): # Mail pmail = infos.get('mailRoutingAddress', []) if pmail: - data['email'] = pmail[0] + data['email'] = pmail[0].decode("utf-8") return data + def get_ldap_infos(clipper): assert clipper.isalnum() - data = {'email':'{}@clipper.ens.fr'.format(clipper.strip().lower())} + data = {} try: l = _init_ldap() @@ -75,11 +80,11 @@ def get_ldap_infos(clipper): ('(uid=%s)' % (clipper,)), ['cn', 'mailRoutingAddress', - 'homeDirectory' ]) + 'homeDirectory']) if len(info) > 0: data = _extract_infos_from_ldap(info[0][1], data) - + except ldap.LDAPError: pass @@ -88,21 +93,22 @@ def get_ldap_infos(clipper): class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter): """ - A class to manage the fact that people loose their account at the end of + A class to manage the fact that people loose their account at the end of their scolarity and that their clipper login might be reused later """ def pre_social_login(self, request, sociallogin): if sociallogin.account.provider != "clipper": - return super(LongTermClipperAccountAdapter, self).pre_social_login(request, sociallogin) - + return super(LongTermClipperAccountAdapter, + self).pre_social_login(request, sociallogin) + clipper = sociallogin.account.uid try: a = SocialAccount.objects.get(provider='clipper_inactive', uid=clipper) except SocialAccount.DoesNotExist: return - + # An account with that uid was registered, but potentially # deprecated at the beginning of the year # We need to check that the user is still the same as before @@ -114,7 +120,8 @@ class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter): # We need a new SocialAccount # But before that, we need to invalidate the email address of # the previous user - email = ldap_data.get('email') + email = ldap_data.get('email', '{}@clipper.ens.fr'.format( + clipper.strip().lower())) u_mails = EmailAddress.objects.filter(user=a.user) try: clipper_mail = u_mails.get(email=email) @@ -137,34 +144,38 @@ class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter): # Redo the thing that had failed just before sociallogin.lookup() - def get_username(self, clipper, data): """ Util function to generate a unique username, by default 'clipper@promo' This is used to disambiguate and recognize if the person is the same """ + if data is None or 'annee' not in data: + raise ValueError("No entrance year in LDAP data") return "{}@{}".format(clipper, data.get('annee', '00')) - + def save_user(self, request, sociallogin, form=None): if sociallogin.account.provider != "clipper": - return Super(LongTermClipperAccountAdapter, self).save_user(request, sociallogin, form) + return super(LongTermClipperAccountAdapter, + self).save_user(request, sociallogin, form) user = sociallogin.user user.set_unusable_password() - + clipper = sociallogin.account.uid - ldap_data = sociallogin._ldap_data if hasattr(sociallogin, '_ldap_data') \ - else get_ldap_infos(clipper) - + ldap_data = sociallogin._ldap_data if hasattr(sociallogin, + '_ldap_data') \ + else get_ldap_infos(clipper) + username = self.get_username(clipper, ldap_data) - email = ldap_data.get('email') + email = ldap_data.get('email', '{}@clipper.ens.fr'.format( + clipper.strip().lower())) name = ldap_data.get('name') user_username(user, username or '') user_email(user, email or '') name_parts = (name or '').split(' ') user_field(user, 'first_name', name_parts[0]) user_field(user, 'last_name', ' '.join(name_parts[1:])) - + # Ignore form get_account_adapter().populate_username(request, user) @@ -172,46 +183,51 @@ class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter): sociallogin.account.extra_data = sociallogin.extra_data = ldap_data sociallogin.save(request) sociallogin.account.save() - + return user + def deprecate_clippers(): """ - Marks all the SocialAccount with clipper as deprecated, by setting their + Marks all the SocialAccount with clipper as deprecated, by setting their provider to 'clipper_inactive' """ - + clippers = SocialAccount.objects.filter(provider='clipper') c_uids = clippers.values_list('uid', flat=True) - - # Clear old clipper accounts that were replaced by new ones (to avoid conflicts) - SocialAccount.objects.filter(provider='clipper_inactive', uid__in=c_uids).delete() - + + # Clear old clipper accounts that were replaced by new ones + # (to avoid conflicts) + SocialAccount.objects.filter(provider='clipper_inactive', + uid__in=c_uids).delete() + # Deprecate accounts clippers.update(provider='clipper_inactive') + def install_longterm_adapter(fake=False): """ - Manages the transition from an older django_cas or an allauth_ens installation - without LongTermClipperAccountAdapter + Manages the transition from an older django_cas or an allauth_ens + installation without LongTermClipperAccountAdapter """ - - accounts = {u.username: u for u in User.objects.all() if u.username.isalnum()} + + accounts = {u.username: u for u in User.objects.all() + if u.username.isalnum()} l = _init_ldap() ltc_adapter = get_adapter() - + info = l.search_s('dc=spi,dc=ens,dc=fr', ldap.SCOPE_SUBTREE, ("(|{})".format(''.join(("(uid=%s)" % (un,)) for un in accounts.keys()))), - [str("uid"), - str("cn"), - str("mailRoutingAddress"), - str("homeDirectory") ]) + ['uid', + 'cn', + 'mailRoutingAddress', + 'homeDirectory']) logs = {"created": [], "updated": []} cases = [] - + for userinfo in info: infos = userinfo[1] data = _extract_infos_from_ldap(infos) @@ -225,13 +241,16 @@ def install_longterm_adapter(fake=False): else: user.save() cases.append(user.username) - if SocialAccount.objects.filter(provider='clipper', uid=clipper).exists(): + if SocialAccount.objects.filter(provider='clipper', + uid=clipper).exists(): logs["updated"].append((clipper, user.username)) continue - sa = SocialAccount(user=user, provider='clipper', uid=clipper, extra_data=data) + sa = SocialAccount(user=user, provider='clipper', + uid=clipper, extra_data=data) if not fake: sa.save() logs["created"].append((clipper, user.username)) - - logs["unmodified"] = User.objects.exclude(username__in=cases).values_list("username", flat=True) + + logs["unmodified"] = User.objects.exclude(username__in=cases)\ + .values_list("username", flat=True) return logs diff --git a/allauth_ens/management/commands/deprecate_clippers.py b/allauth_ens/management/commands/deprecate_clippers.py index a824fd8..257d707 100644 --- a/allauth_ens/management/commands/deprecate_clippers.py +++ b/allauth_ens/management/commands/deprecate_clippers.py @@ -1,8 +1,9 @@ -#coding: utf-8 -from django.core.management.base import BaseCommand, CommandError +# coding: utf-8 +from django.core.management.base import BaseCommand from allauth_ens.adapter import deprecate_clippers + class Command(BaseCommand): help = 'Deprecates clipper SocialAccounts so as to avoid conflicts' @@ -11,4 +12,5 @@ class Command(BaseCommand): def handle(self, *args, **options): deprecate_clippers() - self.stdout.write(self.style.SUCCESS(u'Clippers deprecation successful')) + self.stdout.write(self.style.SUCCESS( + 'Clippers deprecation successful')) diff --git a/allauth_ens/management/commands/install_longterm.py b/allauth_ens/management/commands/install_longterm.py index f733010..40ccde3 100644 --- a/allauth_ens/management/commands/install_longterm.py +++ b/allauth_ens/management/commands/install_longterm.py @@ -1,27 +1,35 @@ -#coding: utf-8 -from django.core.management.base import BaseCommand, CommandError +# coding: utf-8 +from django.core.management.base import BaseCommand from allauth_ens.adapter import install_longterm_adapter + class Command(BaseCommand): - help = 'Manages the transition from an older django_cas or an allauth_ens installation without LongTermClipperAccountAdapter' + help = 'Manages the transition from an older django_cas' \ + 'or an allauth_ens installation without ' \ + 'LongTermClipperAccountAdapter' def add_arguments(self, parser): parser.add_argument( '--fake', action='store_true', default=False, - help='Does not save the models created/updated, only shows the list', + help=('Does not save the models created/updated,' + 'only shows the list'), ) pass def handle(self, *args, **options): logs = install_longterm_adapter(options.get("fake", False)) - self.stdout.write("Social accounts created : %d" % len(logs["created"])) - self.stdout.write(" ".join(("%s -> %s" % s) for s in logs["created"])) - self.stdout.write("Social accounts displaced : %d" % len(logs["updated"])) - self.stdout.write(" ".join(("%s -> %s" % s) for s in logs["updated"])) - self.stdout.write("User accounts unmodified : %d" % len(logs["unmodified"])) + self.stdout.write("Social accounts created : %d" + % len(logs["created"])) + self.stdout.write(" ".join(("%s -> %s" % s) for s in logs["created"])) + self.stdout.write("Social accounts displaced : %d" + % len(logs["updated"])) + self.stdout.write(" ".join(("%s -> %s" % s) for s in logs["updated"])) + self.stdout.write("User accounts unmodified : %d" + % len(logs["unmodified"])) self.stdout.write(" ".join(logs["unmodified"])) - self.stdout.write(self.style.SUCCESS(u'LongTermClipper migration successful')) + self.stdout.write(self.style.SUCCESS( + "LongTermClipper migration successful")) diff --git a/allauth_ens/providers/clipper/provider.py b/allauth_ens/providers/clipper/provider.py index fa057b2..41bb56b 100644 --- a/allauth_ens/providers/clipper/provider.py +++ b/allauth_ens/providers/clipper/provider.py @@ -1,11 +1,8 @@ # -*- coding: utf-8 -*- -import ldap - from allauth.account.models import EmailAddress from allauth.socialaccount.providers.base import ProviderAccount -from allauth_cas.providers import CASProvider -from django.conf import settings +from allauth_cas.providers import CASProvider class ClipperAccount(ProviderAccount): diff --git a/allauth_ens/tests.py b/allauth_ens/tests.py index 5272614..eb276c6 100644 --- a/allauth_ens/tests.py +++ b/allauth_ens/tests.py @@ -6,15 +6,17 @@ from django.contrib.sites.models import Site from django.core import mail from django.test import TestCase, override_settings -from mock import patch -from fakeldap import MockLDAP - -from allauth_cas.test.testcases import CASTestCase, CASViewTestCase -from .adapter import deprecate_clippers, install_longterm_adapter from allauth.socialaccount.models import SocialAccount +from allauth_cas.test.testcases import CASTestCase +from fakeldap import MockLDAP +from mock import patch + +from .adapter import deprecate_clippers, install_longterm_adapter + _mock_ldap = MockLDAP() -ldap_patcher = patch('allauth_ens.adapter.ldap.initialize', lambda x: _mock_ldap) +ldap_patcher = patch('allauth_ens.adapter.ldap.initialize', + lambda x: _mock_ldap) if django.VERSION >= (1, 10): from django.urls import reverse @@ -37,6 +39,7 @@ def prevent_logout_pwd_change(client, user): session[HASH_SESSION_KEY] = user.get_session_auth_hash() session.save() + class ViewsTests(TestCase): """ Checks (barely) that templates do not contain errors. @@ -145,70 +148,77 @@ class ViewsTests(TestCase): r = self.client.get(reverse('account_reset_password_from_key_done')) self.assertEqual(r.status_code, 200) -@override_settings(SOCIALACCOUNT_ADAPTER='allauth_ens.adapter.LongTermClipperAccountAdapter') -class LongTermClipperTests(CASTestCase): +@override_settings( + SOCIALACCOUNT_ADAPTER='allauth_ens.adapter.LongTermClipperAccountAdapter' +) +class LongTermClipperTests(CASTestCase): def setUp(self): ldap_patcher.start() def tearDown(self): + ldap_patcher.stop() _mock_ldap.reset() def _setup_ldap(self, promo=12, username="test"): - import ldap - _mock_ldap.directory['dc=spi,dc=ens,dc=fr']={ + _mock_ldap.directory['dc=spi,dc=ens,dc=fr'] = { 'uid': [username], - 'cn': ['John Smith'], - 'mailRoutingAddress' : ['test@clipper.ens.fr'], - 'homeDirectory': ["/users/%d/phy/test/" % promo], + 'cn': [b'John Smith'], + 'mailRoutingAddress': [b'test@clipper.ens.fr'], + 'homeDirectory': [b'/users/%d/phy/test/' % promo], } def _count_ldap_queries(self): queries = _mock_ldap.ldap_methods_called() count = len([l for l in queries if l != 'set_option']) return count - + def test_new_connexion(self): self._setup_ldap() - - r = self.client_cas_login(self.client, provider_id="clipper", username="test") + + r = self.client_cas_login(self.client, provider_id="clipper", + username="test") u = r.context['user'] - + self.assertEqual(u.username, "test@12") self.assertEqual(u.first_name, "John") self.assertEqual(u.last_name, "Smith") self.assertEqual(u.email, "test@clipper.ens.fr") self.assertEqual(self._count_ldap_queries(), 1) - + sa = list(SocialAccount.objects.all())[-1] self.assertEqual(sa.user.id, u.id) def test_connect_disconnect(self): self._setup_ldap() - r0 = self.client_cas_login(self.client, provider_id="clipper", username="test") + r0 = self.client_cas_login(self.client, provider_id="clipper", + username="test") self.assertIn("_auth_user_id", self.client.session) self.assertIn('user', r0.context) - r1 = self.client.logout() + self.client.logout() self.assertNotIn("_auth_user_id", self.client.session) - + def test_second_connexion(self): self._setup_ldap() - self.client_cas_login(self.client, provider_id="clipper", username="test") + self.client_cas_login(self.client, provider_id="clipper", + username="test") self.client.logout() nu = User.objects.count() - - self.client_cas_login(self.client, provider_id="clipper", username="test") + + self.client_cas_login(self.client, provider_id="clipper", + username="test") self.assertEqual(User.objects.count(), nu) self.assertEqual(self._count_ldap_queries(), 1) def test_deprecation(self): self._setup_ldap() - self.client_cas_login(self.client, provider_id="clipper", username="test") + self.client_cas_login(self.client, provider_id="clipper", + username="test") deprecate_clippers() - + sa = SocialAccount.objects.all()[0] self.assertEqual(sa.provider, "clipper_inactive") @@ -220,9 +230,9 @@ class LongTermClipperTests(CASTestCase): n_sa0 = SocialAccount.objects.count() n_u0 = User.objects.count() self.client.logout() - + deprecate_clippers() - + r = self.client_cas_login(self.client, provider_id="clipper", username="test") user1 = r.context['user'] @@ -241,18 +251,18 @@ class LongTermClipperTests(CASTestCase): n_sa0 = SocialAccount.objects.count() n_u0 = User.objects.count() self.client.logout() - + deprecate_clippers() self._setup_ldap(13) r = self.client_cas_login(self.client, provider_id="clipper", username="test") - + user1 = r.context['user'] sa1 = list(SocialAccount.objects.all()) n_u1 = User.objects.count() - self.assertEqual(len(sa1), n_sa0+1) - self.assertEqual(n_u1, n_u0+1) + self.assertEqual(len(sa1), n_sa0 + 1) + self.assertEqual(n_u1, n_u0 + 1) self.assertNotEqual(user1.id, user0.id) def test_multiple_deprecation(self): @@ -260,18 +270,18 @@ class LongTermClipperTests(CASTestCase): r = self.client_cas_login(self.client, provider_id="clipper", username="test") self.client.logout() - + self._setup_ldap(15, "truc") r = self.client_cas_login(self.client, provider_id="clipper", username="truc") self.client.logout() sa0 = SocialAccount.objects.count() - + deprecate_clippers() self._setup_ldap(13) - r = self.client_cas_login(self.client, provider_id="clipper", - username="test") + self.client_cas_login(self.client, provider_id="clipper", + username="test") self.client.logout() sa1 = SocialAccount.objects.count() @@ -282,12 +292,12 @@ class LongTermClipperTests(CASTestCase): # Older "test" inactive SocialAccount gets erased by new one # while "truc" remains self.assertEqual(sa0, sa2) - self.assertEqual(sa1, sa0+1) + self.assertEqual(sa1, sa0 + 1) def test_longterm_installer_from_allauth(self): self._setup_ldap(12) - with self.settings(SOCIALACCOUNT_ADAPTER=\ - 'allauth.socialaccount.adapter.DefaultSocialAccountAdapter'): + with self.settings(SOCIALACCOUNT_ADAPTER= + 'allauth.socialaccount.adapter.DefaultSocialAccountAdapter'): r = self.client_cas_login(self.client, provider_id="clipper", username='test') user0 = r.context["user"] @@ -307,11 +317,12 @@ class LongTermClipperTests(CASTestCase): self.assertEqual(user1.username, "test@12") def test_longterm_installer_from_djangocas(self): - with self.settings(SOCIALACCOUNT_ADAPTER=\ - 'allauth.socialaccount.adapter.DefaultSocialAccountAdapter'): - user0 = User.objects.create_user('test', 'test@clipper.ens.fr', 'test') + with self.settings(SOCIALACCOUNT_ADAPTER= + 'allauth.socialaccount.adapter.DefaultSocialAccountAdapter'): + user0 = User.objects.create_user('test', 'test@clipper.ens.fr', + 'test') nsa0 = SocialAccount.objects.count() - + self._setup_ldap(12) l = install_longterm_adapter() @@ -322,5 +333,21 @@ class LongTermClipperTests(CASTestCase): user1 = r.context["user"] nsa1 = SocialAccount.objects.count() self.assertEqual(user1.id, user0.id) - self.assertEqual(nsa1, nsa0+1) + self.assertEqual(nsa1, nsa0 + 1) self.assertEqual(user1.username, "test@12") + + def test_disconnect_ldap(self): + nu0 = User.objects.count() + nsa0 = SocialAccount.objects.count() + + ldap_patcher.stop() + with self.settings(LDAP_SERVER=''): + self.assertRaises(ValueError, self.client_cas_login, + self.client, provider_id="clipper", + username="test") + + nu1 = User.objects.count() + nsa1 = SocialAccount.objects.count() + self.assertEqual(nu0, nu1) + self.assertEqual(nsa0, nsa1) + ldap_patcher.start() diff --git a/example/adapter.py b/example/adapter.py index f1079f5..b28cfdd 100644 --- a/example/adapter.py +++ b/example/adapter.py @@ -8,4 +8,10 @@ class AccountAdapter(DefaultAccountAdapter): class SocialAccountAdapter(LongTermClipperAccountAdapter): - pass + + def get_username(self, clipper, data): + """ + Exception-free version of get_username, so that it works even outside of + the ENS (if no access to LDAP server) + """ + return "{}@{}".format(clipper, data.get('annee', '00')) diff --git a/tox.ini b/tox.ini index ec77d8d..513c573 100644 --- a/tox.ini +++ b/tox.ini @@ -15,7 +15,8 @@ deps = django111: django>=1.11,<2.0 django20: django>=2.0,<2.1 coverage - mock ; python_version < "3.0" + fakeldap + mock usedevelop= True commands = python -V