diff --git a/README.rst b/README.rst index 754fdf4..4e953c3 100644 --- a/README.rst +++ b/README.rst @@ -148,23 +148,72 @@ Configuration SOCIALACCOUNT_PROVIDERS = { # … - 'clipper': { - # These settings control whether a message containing a link to # disconnect from the CAS server is added when users log out. 'MESSAGE_SUGGEST_LOGOUT_ON_LOGOUT': True, 'MESSAGE_SUGGEST_LOGOUT_ON_LOGOUT_LEVEL': messages.INFO, - }, } - + Auto-signup - Poulated data + Populated data - username: ```` - email (primary and verified): ``@clipper.ens.fr`` +******** +Adapters +******** +Long Term Clipper Adapter +========================= + +We provide an easy-to-use SocialAccountAdapter to handle the fact that Clipper +accounts are not eternal, and that there is no guarantee that the clipper +usernames won't be reused later. + +This adapter also handles getting basic information about the user from SPI's +LDAP. + +**Important:** If you are building on top of *allauth*, take care to preserve +the ``extra_data['ldap']`` of ``SocialAccount`` instances related to *Clipper* +(where ``provider_id`` is ``clipper`` or ``clipper_inactive``). + +Configuration + Set ``SOCIALACCOUNT_ADAPTER='allauth_ens.adapter.LongTermClipperAccountAdapter'`` + in `settings.py` + +Auto-signup + Populated data + - *username*: ``@`` + - *email*: from LDAP's *mailRoutingAddress* field, or ``@clipper.ens.fr`` + - *first_name*, *last_name* from LDAP's *cn* field + - *entrance_year* (as 2-digit string), *department_code*, *department* and *promotion* (department+year) parsed from LDAP's *homeDirectory* field + - *extra_data* in SocialAccount instance, containing all these field except *promotion* (and available only on first connection) + +Account deprecation + At the beginning of each year (i.e. early November), to prevent clipper + username conflicts, you should run ``$ python manage.py deprecate_clippers``. + Every association clipper username <-> user will then be put on hold, and at + the first subsequent connection, a verification of the account will be made + (using LDAP), so that a known user keeps his account, but a newcomer won't + inherit an archicube's. + +Customize + You can customize the SocialAccountAdapter by inheriting + ``allauth_ens.adapter.LongTermClipperAccountAdapter``. You might want to + modify ``get_username(clipper, data)`` to change the default username format. + By default, ``get_username`` raises a ``ValueError`` when the connexion to the + LDAP failed or did not allow to retrieve the user's entrance year. Overriding + ``get_username`` (as done in the example website) allows to get rid of that + behaviour, and for instance attribute a default entrance year. + +Initial migration + If you used allauth without LongTermClipperAccountAdapter, or another CAS + interface to log in, you need to update the Users to the new username policy, + and (in the second case) to create the SocialAccount instances to link CAS and + Users. This can be done easily with ``$ python manage.py install_longterm``. + ********* Demo Site ********* @@ -201,7 +250,11 @@ Tests Local environment ----------------- -``$ ./runtests.py`` +Requirements + * fakeldap and mock, install with ``$ pip install mock fakeldap`` + +Run + * ``$ ./runtests.py`` All --- diff --git a/allauth_ens/adapter.py b/allauth_ens/adapter.py new file mode 100644 index 0000000..662c98a --- /dev/null +++ b/allauth_ens/adapter.py @@ -0,0 +1,206 @@ +# -*- coding: utf-8 -*- + +from django.contrib.auth import get_user_model + +from allauth.account.utils import user_email, user_field, user_username +from allauth.socialaccount.adapter import ( + DefaultSocialAccountAdapter, get_account_adapter, get_adapter, +) +from allauth.socialaccount.models import SocialAccount + +import ldap + +from .utils import ( + extract_infos_from_ldap, get_clipper_email, get_ldap_infos, init_ldap, + remove_email, +) + +User = get_user_model() + + +class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter): + """ + A class to manage the fact that people loose their account at the end of + their scolarity and that their clipper login might be reused later + """ + + def pre_social_login(self, request, sociallogin): + """ + If a clipper connection has already existed with the uid, it checks + that this connection still belongs to the user it was associated with. + + This check is performed by comparing the entrance years provided by the + LDAP. + + If the check succeeds, it simply reactivates the clipper connection as + belonging to the associated user. + + If the check fails, it frees the elements (as the clipper email + address) which will be assigned to the new connection later. + """ + + if sociallogin.account.provider != "clipper": + return super(LongTermClipperAccountAdapter, + self).pre_social_login(request, sociallogin) + + clipper_uid = sociallogin.account.uid + try: + old_conn = SocialAccount.objects.get(provider='clipper_inactive', + uid=clipper_uid) + except SocialAccount.DoesNotExist: + return + + # An account with that uid was registered, but potentially + # deprecated at the beginning of the year + # We need to check that the user is still the same as before + ldap_data = get_ldap_infos(clipper_uid) + sociallogin._ldap_data = ldap_data + + if ldap_data is None or 'entrance_year' not in ldap_data: + raise ValueError("No entrance year in LDAP data") + + old_conn_entrance_year = ( + old_conn.extra_data.get('ldap', {}).get('entrance_year')) + + if old_conn_entrance_year != ldap_data['entrance_year']: + # We cannot reuse this SocialAccount, so we need to invalidate + # the email address of the previous user to prevent conflicts + # if a new SocialAccount is created + email = ldap_data.get('email', get_clipper_email(clipper_uid)) + remove_email(old_conn.user, email) + + return + + # The admission year is the same, we can update the model and keep + # the previous SocialAccount instance + old_conn.provider = 'clipper' + old_conn.save() + + # Redo the thing that had failed just before + sociallogin.lookup() + + def get_username(self, clipper_uid, data): + """ + Util function to generate a unique username, by default 'clipper@promo' + """ + if data is None or 'entrance_year' not in data: + raise ValueError("No entrance year in LDAP data") + return "{}@{}".format(clipper_uid, data['entrance_year']) + + def save_user(self, request, sociallogin, form=None): + if sociallogin.account.provider != "clipper": + return super(LongTermClipperAccountAdapter, + self).save_user(request, sociallogin, form) + user = sociallogin.user + user.set_unusable_password() + + clipper_uid = sociallogin.account.uid + ldap_data = sociallogin._ldap_data if \ + hasattr(sociallogin, '_ldap_data') \ + else get_ldap_infos(clipper_uid) + + username = self.get_username(clipper_uid, ldap_data) + email = ldap_data.get('email', get_clipper_email(clipper_uid)) + name = ldap_data.get('name') + user_username(user, username or '') + user_email(user, email or '') + name_parts = (name or '').split(' ') + user_field(user, 'first_name', name_parts[0]) + user_field(user, 'last_name', ' '.join(name_parts[1:])) + + # Entrance year and department, if the user has these fields + entrance_year = ldap_data.get('entrance_year', '') + dep_code = ldap_data.get('department_code', '') + dep_fancy = ldap_data.get('department', '') + promotion = u'%s %s' % (dep_fancy, entrance_year) + user_field(user, 'entrance_year', entrance_year) + user_field(user, 'department_code', dep_code) + user_field(user, 'department', dep_fancy) + user_field(user, 'promotion', promotion) + + # Ignore form + get_account_adapter().populate_username(request, user) + + # Save extra data (only once) + sociallogin.account.extra_data['ldap'] = ldap_data + sociallogin.save(request) + sociallogin.account.save() + + return user + + +def deprecate_clippers(): + """ + Marks all the SocialAccount with clipper as deprecated, by setting their + provider to 'clipper_inactive' + """ + + clippers = SocialAccount.objects.filter(provider='clipper') + c_uids = clippers.values_list('uid', flat=True) + + # Clear old clipper accounts that were replaced by new ones + # (to avoid conflicts) + SocialAccount.objects.filter(provider='clipper_inactive', + uid__in=c_uids).delete() + + # Deprecate accounts + clippers.update(provider='clipper_inactive') + + +def install_longterm_adapter(fake=False): + """ + Manages the transition from an older django_cas or an allauth_ens + installation without LongTermClipperAccountAdapter + """ + + accounts = {u.username: u for u in User.objects.all() + if u.username.isalnum()} + ldap_connection = init_ldap() + ltc_adapter = get_adapter() + + info = ldap_connection.search_s( + 'dc=spi,dc=ens,dc=fr', + ldap.SCOPE_SUBTREE, + ("(|{})".format(''.join(("(uid=%s)" % (un,)) + for un in accounts.keys()))), + ['uid', + 'cn', + 'mailRoutingAddress', + 'homeDirectory']) + + logs = {"created": [], "updated": []} + cases = [] + + for userinfo in info: + infos = userinfo[1] + data = extract_infos_from_ldap(infos) + clipper_uid = data['clipper_uid'] + user = accounts.get(clipper_uid, None) + if user is None: + continue + user.username = ltc_adapter.get_username(clipper_uid, data) + if fake: + cases.append(clipper_uid) + else: + user.save() + cases.append(user.username) + + try: + sa = SocialAccount.objects.get(provider='clipper', uid=clipper_uid) + if not sa.extra_data.get('ldap'): + sa.extra_data['ldap'] = data + if not fake: + sa.save(update_fields=['extra_data']) + logs["updated"].append((clipper_uid, user.username)) + except SocialAccount.DoesNotExist: + sa = SocialAccount( + provider='clipper', uid=clipper_uid, + user=user, extra_data={'ldap': data}, + ) + if not fake: + sa.save() + logs["created"].append((clipper_uid, user.username)) + + logs["unmodified"] = User.objects.exclude(username__in=cases)\ + .values_list("username", flat=True) + return logs diff --git a/allauth_ens/management/__init__.py b/allauth_ens/management/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/allauth_ens/management/commands/__init__.py b/allauth_ens/management/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/allauth_ens/management/commands/deprecate_clippers.py b/allauth_ens/management/commands/deprecate_clippers.py new file mode 100644 index 0000000..257d707 --- /dev/null +++ b/allauth_ens/management/commands/deprecate_clippers.py @@ -0,0 +1,16 @@ +# coding: utf-8 +from django.core.management.base import BaseCommand + +from allauth_ens.adapter import deprecate_clippers + + +class Command(BaseCommand): + help = 'Deprecates clipper SocialAccounts so as to avoid conflicts' + + def add_arguments(self, parser): + pass + + def handle(self, *args, **options): + deprecate_clippers() + self.stdout.write(self.style.SUCCESS( + 'Clippers deprecation successful')) diff --git a/allauth_ens/management/commands/install_longterm.py b/allauth_ens/management/commands/install_longterm.py new file mode 100644 index 0000000..40ccde3 --- /dev/null +++ b/allauth_ens/management/commands/install_longterm.py @@ -0,0 +1,35 @@ +# coding: utf-8 +from django.core.management.base import BaseCommand + +from allauth_ens.adapter import install_longterm_adapter + + +class Command(BaseCommand): + help = 'Manages the transition from an older django_cas' \ + 'or an allauth_ens installation without ' \ + 'LongTermClipperAccountAdapter' + + def add_arguments(self, parser): + parser.add_argument( + '--fake', + action='store_true', + default=False, + help=('Does not save the models created/updated,' + 'only shows the list'), + ) + pass + + def handle(self, *args, **options): + logs = install_longterm_adapter(options.get("fake", False)) + self.stdout.write("Social accounts created : %d" + % len(logs["created"])) + self.stdout.write(" ".join(("%s -> %s" % s) for s in logs["created"])) + self.stdout.write("Social accounts displaced : %d" + % len(logs["updated"])) + self.stdout.write(" ".join(("%s -> %s" % s) for s in logs["updated"])) + self.stdout.write("User accounts unmodified : %d" + % len(logs["unmodified"])) + self.stdout.write(" ".join(logs["unmodified"])) + + self.stdout.write(self.style.SUCCESS( + "LongTermClipper migration successful")) diff --git a/allauth_ens/providers/clipper/provider.py b/allauth_ens/providers/clipper/provider.py index d14bee2..4ee6169 100644 --- a/allauth_ens/providers/clipper/provider.py +++ b/allauth_ens/providers/clipper/provider.py @@ -1,11 +1,8 @@ # -*- coding: utf-8 -*- -import ldap - from allauth.account.models import EmailAddress from allauth.socialaccount.providers.base import ProviderAccount -from allauth_cas.providers import CASProvider -from django.conf import settings +from allauth_cas.providers import CASProvider class ClipperAccount(ProviderAccount): @@ -21,41 +18,14 @@ class ClipperProvider(CASProvider): uid, extra = data return '{}@clipper.ens.fr'.format(uid.strip().lower()) + def extract_uid(self, data): + uid, _ = data + uid = uid.lower().strip() + return uid + def extract_common_fields(self, data): - def get_names(clipper): - assert clipper.isalnum() - try: - ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, - ldap.OPT_X_TLS_NEVER) - l = ldap.initialize("ldaps://ldap.spi.ens.fr:636") - l.set_option(ldap.OPT_REFERRALS, 0) - l.set_option(ldap.OPT_PROTOCOL_VERSION, 3) - l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND) - l.set_option(ldap.OPT_X_TLS_DEMAND, True) - l.set_option(ldap.OPT_DEBUG_LEVEL, 255) - l.set_option(ldap.OPT_NETWORK_TIMEOUT, 10) - l.set_option(ldap.OPT_TIMEOUT, 10) - - info = l.search_s('dc=spi,dc=ens,dc=fr', - ldap.SCOPE_SUBTREE, - ('(uid=%s)' % (clipper,)), - [str("cn"), ]) - - if len(info) > 0: - fullname = info[0][1].get('cn', [''])[0].decode("utf-8") - first_name, last_name = fullname.split(' ', 1) - return first_name, last_name - - except ldap.LDAPError: - pass - - return '', '' - common = super(ClipperProvider, self).extract_common_fields(data) - fn, ln = get_names(common['username']) common['email'] = self.extract_email(data) - common['name'] = fn - common['last_name'] = ln return common def extract_email_addresses(self, data): @@ -67,8 +37,23 @@ class ClipperProvider(CASProvider): ] def extract_extra_data(self, data): + """ + If LongTermClipperAccountAdapter is in use, keep the data retrieved + from the LDAP server. + """ + from allauth.socialaccount.models import SocialAccount # noqa extra_data = super(ClipperProvider, self).extract_extra_data(data) extra_data['email'] = self.extract_email(data) + + # Preserve LDAP data at all cost. + try: + clipper_account = SocialAccount.objects.get( + provider=self.id, uid=self.extract_uid(data)) + if 'ldap' in clipper_account.extra_data: + extra_data['ldap'] = clipper_account.extra_data['ldap'] + except SocialAccount.DoesNotExist: + pass + return extra_data def message_suggest_caslogout_on_logout(self, request): diff --git a/allauth_ens/providers/clipper/tests.py b/allauth_ens/providers/clipper/tests.py index 657982c..00bb3b1 100644 --- a/allauth_ens/providers/clipper/tests.py +++ b/allauth_ens/providers/clipper/tests.py @@ -17,6 +17,18 @@ class ClipperProviderTests(CASTestCase): u = User.objects.get(username='clipper_uid') self.assertEqual(u.email, 'clipper_uid@clipper.ens.fr') + def test_extra_data_keeps_ldap_data(self): + clipper_conn = self.u.socialaccount_set.create( + uid='user', provider='clipper', + extra_data={'ldap': {'aa': 'bb'}}, + ) + + self.client_cas_login( + self.client, provider_id='clipper', username='user') + + clipper_conn.refresh_from_db() + self.assertEqual(clipper_conn.extra_data['ldap'], {'aa': 'bb'}) + class ClipperViewsTests(CASViewTestCase): diff --git a/allauth_ens/tests.py b/allauth_ens/tests.py index ef2a98c..b902d81 100644 --- a/allauth_ens/tests.py +++ b/allauth_ens/tests.py @@ -1,3 +1,5 @@ +from __future__ import unicode_literals + import re import django @@ -6,6 +8,19 @@ from django.contrib.sites.models import Site from django.core import mail from django.test import TestCase, override_settings +from allauth.socialaccount.models import SocialAccount + +import six +from allauth_cas.test.testcases import CASTestCase +from fakeldap import MockLDAP +from mock import patch + +from .adapter import deprecate_clippers, install_longterm_adapter + +_mock_ldap = MockLDAP() +ldap_patcher = patch('allauth_ens.utils.ldap.initialize', + lambda x: _mock_ldap) + if django.VERSION >= (1, 10): from django.urls import reverse else: @@ -135,3 +150,220 @@ class ViewsTests(TestCase): def test_account_reset_password_from_key_done(self): r = self.client.get(reverse('account_reset_password_from_key_done')) self.assertEqual(r.status_code, 200) + + +@override_settings( + SOCIALACCOUNT_ADAPTER='allauth_ens.adapter.LongTermClipperAccountAdapter' +) +class LongTermClipperTests(CASTestCase): + def setUp(self): + ldap_patcher.start() + + def tearDown(self): + ldap_patcher.stop() + _mock_ldap.reset() + + def _setup_ldap(self, promo=12, username="test"): + try: + buid = six.binary_type(username, 'utf-8') + home = six.binary_type('/users/%d/phy/test/' % promo, 'utf-8') + except TypeError: + buid = six.binary_type(username) + home = six.binary_type('/users/%d/phy/test/' % promo) + _mock_ldap.directory['dc=spi,dc=ens,dc=fr'] = { + 'uid': [buid], + 'cn': [b'John Smith'], + 'mailRoutingAddress': [b'test@clipper.ens.fr'], + 'homeDirectory': [home], + } + + def _count_ldap_queries(self): + queries = _mock_ldap.ldap_methods_called() + count = len([op for op in queries if op != 'set_option']) + return count + + def test_new_connexion(self): + self._setup_ldap() + + r = self.client_cas_login(self.client, provider_id="clipper", + username="test") + u = r.context['user'] + + self.assertEqual(u.username, "test@12") + self.assertEqual(u.first_name, "John") + self.assertEqual(u.last_name, "Smith") + self.assertEqual(u.email, "test@clipper.ens.fr") + self.assertEqual(self._count_ldap_queries(), 1) + + sa = list(SocialAccount.objects.all())[-1] + self.assertEqual(sa.user.id, u.id) + self.assertEqual(sa.extra_data['ldap']['entrance_year'], '12') + + def test_connect_disconnect(self): + self._setup_ldap() + r0 = self.client_cas_login(self.client, provider_id="clipper", + username="test") + self.assertIn("_auth_user_id", self.client.session) + self.assertIn('user', r0.context) + + self.client.logout() + self.assertNotIn("_auth_user_id", self.client.session) + + def test_second_connexion(self): + self._setup_ldap() + + self.client_cas_login(self.client, provider_id="clipper", + username="test") + self.client.logout() + + nu = User.objects.count() + + self.client_cas_login(self.client, provider_id="clipper", + username="test") + self.assertEqual(User.objects.count(), nu) + self.assertEqual(self._count_ldap_queries(), 1) + + def test_deprecation(self): + self._setup_ldap() + self.client_cas_login(self.client, provider_id="clipper", + username="test") + deprecate_clippers() + + sa = SocialAccount.objects.all()[0] + self.assertEqual(sa.provider, "clipper_inactive") + + def test_reconnect_after_deprecation(self): + self._setup_ldap() + r = self.client_cas_login(self.client, provider_id="clipper", + username="test") + user0 = r.context['user'] + n_sa0 = SocialAccount.objects.count() + n_u0 = User.objects.count() + self.client.logout() + + deprecate_clippers() + + r = self.client_cas_login(self.client, provider_id="clipper", + username="test") + user1 = r.context['user'] + sa1 = list(SocialAccount.objects.all()) + n_u1 = User.objects.count() + self.assertEqual(len(sa1), n_sa0) + self.assertEqual(n_u1, n_u0) + self.assertEqual(user1.id, user0.id) + self.assertEqual(self._count_ldap_queries(), 2) + + def test_override_inactive_account(self): + self._setup_ldap(12) + r = self.client_cas_login(self.client, provider_id="clipper", + username="test") + user0 = r.context['user'] + n_sa0 = SocialAccount.objects.count() + n_u0 = User.objects.count() + self.client.logout() + + deprecate_clippers() + + self._setup_ldap(13) + r = self.client_cas_login(self.client, provider_id="clipper", + username="test") + + user1 = r.context['user'] + sa1 = list(SocialAccount.objects.all()) + n_u1 = User.objects.count() + self.assertEqual(len(sa1), n_sa0 + 1) + self.assertEqual(n_u1, n_u0 + 1) + self.assertNotEqual(user1.id, user0.id) + + def test_multiple_deprecation(self): + self._setup_ldap(12) + self.client_cas_login(self.client, provider_id="clipper", + username="test") + self.client.logout() + + self._setup_ldap(15, "truc") + self.client_cas_login(self.client, provider_id="clipper", + username="truc") + self.client.logout() + sa0 = SocialAccount.objects.count() + + deprecate_clippers() + + self._setup_ldap(13) + self.client_cas_login(self.client, provider_id="clipper", + username="test") + self.client.logout() + + sa1 = SocialAccount.objects.count() + + deprecate_clippers() + sa2 = SocialAccount.objects.count() + + # Older "test" inactive SocialAccount gets erased by new one + # while "truc" remains + self.assertEqual(sa0, sa2) + self.assertEqual(sa1, sa0 + 1) + + def test_longterm_installer_from_allauth(self): + self._setup_ldap(12) + with self.settings( + SOCIALACCOUNT_ADAPTER='allauth.socialaccount.' + 'adapter.DefaultSocialAccountAdapter'): + r = self.client_cas_login(self.client, provider_id="clipper", + username='test') + user0 = r.context["user"] + nsa0 = SocialAccount.objects.count() + self.assertEqual(user0.username, "test") + self.client.logout() + + outputs = install_longterm_adapter() + + self.assertEqual(outputs["updated"], [("test", "test@12")]) + r = self.client_cas_login(self.client, provider_id="clipper", + username='test') + user1 = r.context["user"] + nsa1 = SocialAccount.objects.count() + conn = user1.socialaccount_set.get(provider='clipper') + self.assertEqual(user1.id, user0.id) + self.assertEqual(nsa1, nsa0) + self.assertEqual(user1.username, "test@12") + self.assertEqual(conn.extra_data['ldap']['entrance_year'], '12') + + def test_longterm_installer_from_djangocas(self): + with self.settings( + SOCIALACCOUNT_ADAPTER='allauth.socialaccount.' + 'adapter.DefaultSocialAccountAdapter'): + user0 = User.objects.create_user('test', 'test@clipper.ens.fr', + 'test') + nsa0 = SocialAccount.objects.count() + + self._setup_ldap(12) + + outputs = install_longterm_adapter() + + self.assertEqual(outputs["created"], [("test", "test@12")]) + r = self.client_cas_login(self.client, provider_id="clipper", + username='test') + user1 = r.context["user"] + nsa1 = SocialAccount.objects.count() + conn = user1.socialaccount_set.get(provider='clipper') + self.assertEqual(user1.id, user0.id) + self.assertEqual(nsa1, nsa0 + 1) + self.assertEqual(user1.username, "test@12") + self.assertEqual(conn.extra_data['ldap']['entrance_year'], '12') + + def test_disconnect_ldap(self): + nu0 = User.objects.count() + nsa0 = SocialAccount.objects.count() + + ldap_patcher.stop() + with self.settings(CLIPPER_LDAP_SERVER=''): + self.assertRaises(ValueError, self.client_cas_login, + self.client, provider_id="clipper", + username="test") + + nu1 = User.objects.count() + nsa1 = SocialAccount.objects.count() + self.assertEqual(nu0, nu1) + self.assertEqual(nsa0, nsa1) + ldap_patcher.start() diff --git a/allauth_ens/utils.py b/allauth_ens/utils.py new file mode 100644 index 0000000..afd29a9 --- /dev/null +++ b/allauth_ens/utils.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- + +from django.conf import settings + +from allauth.account.models import EmailAddress +from allauth.account.utils import user_email + +import ldap + +DEPARTMENTS_LIST = { + 'phy': u'Physique', + 'maths': u'Maths', + 'bio': u'Biologie', + 'chimie': u'Chimie', + 'geol': u'Géosciences', + 'dec': u'DEC', + 'info': u'Informatique', + 'litt': u'Littéraire', + 'guests': u'Pensionnaires étrangers', + 'pei': u'PEI', +} + + +def init_ldap(): + server = getattr(settings, "CLIPPER_LDAP_SERVER", + "ldaps://ldap.spi.ens.fr:636") + ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, + ldap.OPT_X_TLS_NEVER) + ldap_connection = ldap.initialize(server) + ldap_connection.set_option(ldap.OPT_REFERRALS, 0) + ldap_connection.set_option(ldap.OPT_PROTOCOL_VERSION, 3) + ldap_connection.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND) + ldap_connection.set_option(ldap.OPT_X_TLS_DEMAND, True) + ldap_connection.set_option(ldap.OPT_DEBUG_LEVEL, 255) + ldap_connection.set_option(ldap.OPT_NETWORK_TIMEOUT, 10) + ldap_connection.set_option(ldap.OPT_TIMEOUT, 10) + return ldap_connection + + +def extract_infos_from_ldap(infos): + data = {} + + # Name + if 'cn' in infos: + data['name'] = infos['cn'][0].decode("utf-8") + + # Parsing homeDirectory to get entrance year and departments + if 'homeDirectory' in infos: + dirs = infos['homeDirectory'][0].decode("utf-8").split('/') + if len(dirs) >= 4 and dirs[1] == 'users': + # Assume template "/users///clipper/" + annee = dirs[2] + dep = dirs[3] + dep_fancy = DEPARTMENTS_LIST.get(dep.lower(), '') + data['entrance_year'] = annee + data['department_code'] = dep + data['department'] = dep_fancy + + # Mail + pmail = infos.get('mailRoutingAddress', []) + if pmail: + data['email'] = pmail[0].decode("utf-8") + + # User id + if 'uid' in infos: + data['clipper_uid'] = infos['uid'][0].decode("utf-8").strip().lower() + + return data + + +def get_ldap_infos(clipper_uid): + assert clipper_uid.isalnum() + data = {} + try: + ldap_connection = init_ldap() + + info = ldap_connection.search_s('dc=spi,dc=ens,dc=fr', + ldap.SCOPE_SUBTREE, + ('(uid=%s)' % (clipper_uid,)), + ['cn', + 'mailRoutingAddress', + 'homeDirectory']) + + if len(info) > 0: + data = extract_infos_from_ldap(info[0][1]) + + except ldap.LDAPError: + pass + + return data + + +def get_clipper_email(clipper): + return '{}@clipper.ens.fr'.format(clipper.strip().lower()) + + +def remove_email(user, email): + """ + Removes an email address of a user. + + If it is his primary email address, it sets another email address as + primary, preferably verified. + """ + u_mailaddrs = user.emailaddress_set.filter(user=user) + try: + mailaddr = user.emailaddress_set.get(email=email) + except EmailAddress.DoesNotExist: + return + + if mailaddr.primary: + others = u_mailaddrs.filter(primary=False) + + # Prefer a verified mail. + new_primary = ( + others.filter(verified=True).last() or + others.last() + ) + + if new_primary: + # It also updates 'user.EMAIL_FIELD'. + new_primary.set_as_primary() + else: + user_email(user, '') + user.save() + + mailaddr.delete() diff --git a/example/adapter.py b/example/adapter.py index 3e288e5..5ad5e10 100644 --- a/example/adapter.py +++ b/example/adapter.py @@ -1,11 +1,17 @@ # -*- coding: utf-8 -*- from allauth.account.adapter import DefaultAccountAdapter +from allauth_ens.adapter import LongTermClipperAccountAdapter from allauth.socialaccount.adapter import DefaultSocialAccountAdapter - class AccountAdapter(DefaultAccountAdapter): pass -class SocialAccountAdapter(DefaultSocialAccountAdapter): - pass +class SocialAccountAdapter(LongTermClipperAccountAdapter): + + def get_username(self, clipper, data): + """ + Exception-free version of get_username, so that it works even outside of + the ENS (if no access to LDAP server) + """ + return "{}@{}".format(clipper, data.get('entrance_year', '00')) diff --git a/setup.cfg b/setup.cfg index 179a83e..bfb5cf5 100644 --- a/setup.cfg +++ b/setup.cfg @@ -7,7 +7,6 @@ combine_as_imports = True default_section = THIRDPARTY include_trailing_comma = True known_allauth = allauth -known_future_library = future,six known_django = django known_first_party = allauth_ens multi_line_output = 5 diff --git a/tox.ini b/tox.ini index ec77d8d..513c573 100644 --- a/tox.ini +++ b/tox.ini @@ -15,7 +15,8 @@ deps = django111: django>=1.11,<2.0 django20: django>=2.0,<2.1 coverage - mock ; python_version < "3.0" + fakeldap + mock usedevelop= True commands = python -V