diff --git a/allauth_ens/adapter.py b/allauth_ens/adapter.py index e85d978..a283cc6 100644 --- a/allauth_ens/adapter.py +++ b/allauth_ens/adapter.py @@ -10,10 +10,14 @@ from allauth.socialaccount.models import SocialAccount import ldap -from .utils import extract_infos_from_ldap, get_ldap_infos, get_clipper_email, remove_email, init_ldap +from .utils import ( + extract_infos_from_ldap, get_clipper_email, get_ldap_infos, init_ldap, + remove_email, +) User = get_user_model() + class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter): """ A class to manage the fact that people loose their account at the end of @@ -25,8 +29,8 @@ class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter): If a clipper connection has already existed with the uid, it checks that this connection still belongs to the user it was associated with. - This check is performed by comparing the generated username corresponding - to this connection with the old one. + This check is performed by comparing the generated username + corresponding to this connection with the old one. If the check succeeds, it simply reactivates the clipper connection as belonging to the associated user. @@ -59,7 +63,7 @@ class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter): # if a new SocialAccount is created email = ldap_data.get('email', get_clipper_email(clipper_uid)) remove_email(old_conn.user, email) - + return # The admission year is the same, we can update the model and keep @@ -87,9 +91,9 @@ class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter): user.set_unusable_password() clipper_uid = sociallogin.account.uid - ldap_data = sociallogin._ldap_data if hasattr(sociallogin, - '_ldap_data') \ - else get_ldap_infos(clipper_uid) + ldap_data = sociallogin._ldap_data if \ + hasattr(sociallogin, '_ldap_data') \ + else get_ldap_infos(clipper_uid) username = self.get_username(clipper_uid, ldap_data) email = ldap_data.get('email', get_clipper_email(clipper_uid)) @@ -147,17 +151,18 @@ def install_longterm_adapter(fake=False): accounts = {u.username: u for u in User.objects.all() if u.username.isalnum()} - l = init_ldap() + ldap_connection = init_ldap() ltc_adapter = get_adapter() - info = l.search_s('dc=spi,dc=ens,dc=fr', - ldap.SCOPE_SUBTREE, - ("(|{})".format(''.join(("(uid=%s)" % (un,)) - for un in accounts.keys()))), - ['uid', - 'cn', - 'mailRoutingAddress', - 'homeDirectory']) + info = ldap_connection.search_s( + 'dc=spi,dc=ens,dc=fr', + ldap.SCOPE_SUBTREE, + ("(|{})".format(''.join(("(uid=%s)" % (un,)) + for un in accounts.keys()))), + ['uid', + 'cn', + 'mailRoutingAddress', + 'homeDirectory']) logs = {"created": [], "updated": []} cases = [] diff --git a/allauth_ens/tests.py b/allauth_ens/tests.py index d9c01e5..805e627 100644 --- a/allauth_ens/tests.py +++ b/allauth_ens/tests.py @@ -165,12 +165,12 @@ class LongTermClipperTests(CASTestCase): 'uid': [username], 'cn': [b'John Smith'], 'mailRoutingAddress': [b'test@clipper.ens.fr'], - 'homeDirectory': [b'/users/%d/phy/test/' % promo], + 'homeDirectory': [bytes('/users/%d/phy/test/' % promo)], } def _count_ldap_queries(self): queries = _mock_ldap.ldap_methods_called() - count = len([l for l in queries if l != 'set_option']) + count = len([op for op in queries if op != 'set_option']) return count def test_new_connexion(self): @@ -267,13 +267,13 @@ class LongTermClipperTests(CASTestCase): def test_multiple_deprecation(self): self._setup_ldap(12) - r = self.client_cas_login(self.client, provider_id="clipper", - username="test") + self.client_cas_login(self.client, provider_id="clipper", + username="test") self.client.logout() self._setup_ldap(15, "truc") - r = self.client_cas_login(self.client, provider_id="clipper", - username="truc") + self.client_cas_login(self.client, provider_id="clipper", + username="truc") self.client.logout() sa0 = SocialAccount.objects.count() @@ -296,8 +296,9 @@ class LongTermClipperTests(CASTestCase): def test_longterm_installer_from_allauth(self): self._setup_ldap(12) - with self.settings(SOCIALACCOUNT_ADAPTER= - 'allauth.socialaccount.adapter.DefaultSocialAccountAdapter'): + with self.settings( + SOCIALACCOUNT_ADAPTER='allauth.socialaccount.' + 'adapter.DefaultSocialAccountAdapter'): r = self.client_cas_login(self.client, provider_id="clipper", username='test') user0 = r.context["user"] @@ -305,9 +306,9 @@ class LongTermClipperTests(CASTestCase): self.assertEqual(user0.username, "test") self.client.logout() - l = install_longterm_adapter() + outputs = install_longterm_adapter() - self.assertEqual(l["updated"], [("test", "test@12")]) + self.assertEqual(outputs["updated"], [("test", "test@12")]) r = self.client_cas_login(self.client, provider_id="clipper", username='test') user1 = r.context["user"] @@ -317,17 +318,18 @@ class LongTermClipperTests(CASTestCase): self.assertEqual(user1.username, "test@12") def test_longterm_installer_from_djangocas(self): - with self.settings(SOCIALACCOUNT_ADAPTER= - 'allauth.socialaccount.adapter.DefaultSocialAccountAdapter'): + with self.settings( + SOCIALACCOUNT_ADAPTER='allauth.socialaccount.' + 'adapter.DefaultSocialAccountAdapter'): user0 = User.objects.create_user('test', 'test@clipper.ens.fr', 'test') nsa0 = SocialAccount.objects.count() self._setup_ldap(12) - l = install_longterm_adapter() + outputs = install_longterm_adapter() - self.assertEqual(l["created"], [("test", "test@12")]) + self.assertEqual(outputs["created"], [("test", "test@12")]) r = self.client_cas_login(self.client, provider_id="clipper", username='test') user1 = r.context["user"] diff --git a/allauth_ens/utils.py b/allauth_ens/utils.py index 230d55d..afd29a9 100644 --- a/allauth_ens/utils.py +++ b/allauth_ens/utils.py @@ -20,23 +20,26 @@ DEPARTMENTS_LIST = { 'pei': u'PEI', } + def init_ldap(): - server = getattr(settings, "CLIPPER_LDAP_SERVER", "ldaps://ldap.spi.ens.fr:636") + server = getattr(settings, "CLIPPER_LDAP_SERVER", + "ldaps://ldap.spi.ens.fr:636") ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) - l = ldap.initialize(server) - l.set_option(ldap.OPT_REFERRALS, 0) - l.set_option(ldap.OPT_PROTOCOL_VERSION, 3) - l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND) - l.set_option(ldap.OPT_X_TLS_DEMAND, True) - l.set_option(ldap.OPT_DEBUG_LEVEL, 255) - l.set_option(ldap.OPT_NETWORK_TIMEOUT, 10) - l.set_option(ldap.OPT_TIMEOUT, 10) - return l + ldap_connection = ldap.initialize(server) + ldap_connection.set_option(ldap.OPT_REFERRALS, 0) + ldap_connection.set_option(ldap.OPT_PROTOCOL_VERSION, 3) + ldap_connection.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND) + ldap_connection.set_option(ldap.OPT_X_TLS_DEMAND, True) + ldap_connection.set_option(ldap.OPT_DEBUG_LEVEL, 255) + ldap_connection.set_option(ldap.OPT_NETWORK_TIMEOUT, 10) + ldap_connection.set_option(ldap.OPT_TIMEOUT, 10) + return ldap_connection + def extract_infos_from_ldap(infos): data = {} - + # Name if 'cn' in infos: data['name'] = infos['cn'][0].decode("utf-8") @@ -57,25 +60,26 @@ def extract_infos_from_ldap(infos): pmail = infos.get('mailRoutingAddress', []) if pmail: data['email'] = pmail[0].decode("utf-8") - + # User id if 'uid' in infos: data['clipper_uid'] = infos['uid'][0].decode("utf-8").strip().lower() - + return data + def get_ldap_infos(clipper_uid): assert clipper_uid.isalnum() data = {} try: - l = init_ldap() + ldap_connection = init_ldap() - info = l.search_s('dc=spi,dc=ens,dc=fr', - ldap.SCOPE_SUBTREE, - ('(uid=%s)' % (clipper_uid,)), - ['cn', - 'mailRoutingAddress', - 'homeDirectory']) + info = ldap_connection.search_s('dc=spi,dc=ens,dc=fr', + ldap.SCOPE_SUBTREE, + ('(uid=%s)' % (clipper_uid,)), + ['cn', + 'mailRoutingAddress', + 'homeDirectory']) if len(info) > 0: data = extract_infos_from_ldap(info[0][1]) @@ -85,9 +89,11 @@ def get_ldap_infos(clipper_uid): return data + def get_clipper_email(clipper): return '{}@clipper.ens.fr'.format(clipper.strip().lower()) + def remove_email(user, email): """ Removes an email address of a user.