Merge branch 'Evarin/archicubes' into 'master'
Un adapter django-allauth pour gérer les comptes en fin de scolarité et la possible réattribution des logins clippers. See merge request cof-geek/django-allauth-ens!3
This commit is contained in:
commit
bc2b606288
13 changed files with 718 additions and 47 deletions
65
README.rst
65
README.rst
|
@ -148,23 +148,72 @@ Configuration
|
|||
|
||||
SOCIALACCOUNT_PROVIDERS = {
|
||||
# …
|
||||
|
||||
'clipper': {
|
||||
|
||||
# These settings control whether a message containing a link to
|
||||
# disconnect from the CAS server is added when users log out.
|
||||
'MESSAGE_SUGGEST_LOGOUT_ON_LOGOUT': True,
|
||||
'MESSAGE_SUGGEST_LOGOUT_ON_LOGOUT_LEVEL': messages.INFO,
|
||||
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
Auto-signup
|
||||
Poulated data
|
||||
Populated data
|
||||
- username: ``<clipper>``
|
||||
- email (primary and verified): ``<clipper>@clipper.ens.fr``
|
||||
|
||||
********
|
||||
Adapters
|
||||
********
|
||||
|
||||
Long Term Clipper Adapter
|
||||
=========================
|
||||
|
||||
We provide an easy-to-use SocialAccountAdapter to handle the fact that Clipper
|
||||
accounts are not eternal, and that there is no guarantee that the clipper
|
||||
usernames won't be reused later.
|
||||
|
||||
This adapter also handles getting basic information about the user from SPI's
|
||||
LDAP.
|
||||
|
||||
**Important:** If you are building on top of *allauth*, take care to preserve
|
||||
the ``extra_data['ldap']`` of ``SocialAccount`` instances related to *Clipper*
|
||||
(where ``provider_id`` is ``clipper`` or ``clipper_inactive``).
|
||||
|
||||
Configuration
|
||||
Set ``SOCIALACCOUNT_ADAPTER='allauth_ens.adapter.LongTermClipperAccountAdapter'``
|
||||
in `settings.py`
|
||||
|
||||
Auto-signup
|
||||
Populated data
|
||||
- *username*: ``<clipper>@<entrance year>``
|
||||
- *email*: from LDAP's *mailRoutingAddress* field, or ``<clipper>@clipper.ens.fr``
|
||||
- *first_name*, *last_name* from LDAP's *cn* field
|
||||
- *entrance_year* (as 2-digit string), *department_code*, *department* and *promotion* (department+year) parsed from LDAP's *homeDirectory* field
|
||||
- *extra_data* in SocialAccount instance, containing all these field except *promotion* (and available only on first connection)
|
||||
|
||||
Account deprecation
|
||||
At the beginning of each year (i.e. early November), to prevent clipper
|
||||
username conflicts, you should run ``$ python manage.py deprecate_clippers``.
|
||||
Every association clipper username <-> user will then be put on hold, and at
|
||||
the first subsequent connection, a verification of the account will be made
|
||||
(using LDAP), so that a known user keeps his account, but a newcomer won't
|
||||
inherit an archicube's.
|
||||
|
||||
Customize
|
||||
You can customize the SocialAccountAdapter by inheriting
|
||||
``allauth_ens.adapter.LongTermClipperAccountAdapter``. You might want to
|
||||
modify ``get_username(clipper, data)`` to change the default username format.
|
||||
By default, ``get_username`` raises a ``ValueError`` when the connexion to the
|
||||
LDAP failed or did not allow to retrieve the user's entrance year. Overriding
|
||||
``get_username`` (as done in the example website) allows to get rid of that
|
||||
behaviour, and for instance attribute a default entrance year.
|
||||
|
||||
Initial migration
|
||||
If you used allauth without LongTermClipperAccountAdapter, or another CAS
|
||||
interface to log in, you need to update the Users to the new username policy,
|
||||
and (in the second case) to create the SocialAccount instances to link CAS and
|
||||
Users. This can be done easily with ``$ python manage.py install_longterm``.
|
||||
|
||||
*********
|
||||
Demo Site
|
||||
*********
|
||||
|
@ -201,7 +250,11 @@ Tests
|
|||
Local environment
|
||||
-----------------
|
||||
|
||||
``$ ./runtests.py``
|
||||
Requirements
|
||||
* fakeldap and mock, install with ``$ pip install mock fakeldap``
|
||||
|
||||
Run
|
||||
* ``$ ./runtests.py``
|
||||
|
||||
All
|
||||
---
|
||||
|
|
206
allauth_ens/adapter.py
Normal file
206
allauth_ens/adapter.py
Normal file
|
@ -0,0 +1,206 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
from django.contrib.auth import get_user_model
|
||||
|
||||
from allauth.account.utils import user_email, user_field, user_username
|
||||
from allauth.socialaccount.adapter import (
|
||||
DefaultSocialAccountAdapter, get_account_adapter, get_adapter,
|
||||
)
|
||||
from allauth.socialaccount.models import SocialAccount
|
||||
|
||||
import ldap
|
||||
|
||||
from .utils import (
|
||||
extract_infos_from_ldap, get_clipper_email, get_ldap_infos, init_ldap,
|
||||
remove_email,
|
||||
)
|
||||
|
||||
User = get_user_model()
|
||||
|
||||
|
||||
class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter):
|
||||
"""
|
||||
A class to manage the fact that people loose their account at the end of
|
||||
their scolarity and that their clipper login might be reused later
|
||||
"""
|
||||
|
||||
def pre_social_login(self, request, sociallogin):
|
||||
"""
|
||||
If a clipper connection has already existed with the uid, it checks
|
||||
that this connection still belongs to the user it was associated with.
|
||||
|
||||
This check is performed by comparing the entrance years provided by the
|
||||
LDAP.
|
||||
|
||||
If the check succeeds, it simply reactivates the clipper connection as
|
||||
belonging to the associated user.
|
||||
|
||||
If the check fails, it frees the elements (as the clipper email
|
||||
address) which will be assigned to the new connection later.
|
||||
"""
|
||||
|
||||
if sociallogin.account.provider != "clipper":
|
||||
return super(LongTermClipperAccountAdapter,
|
||||
self).pre_social_login(request, sociallogin)
|
||||
|
||||
clipper_uid = sociallogin.account.uid
|
||||
try:
|
||||
old_conn = SocialAccount.objects.get(provider='clipper_inactive',
|
||||
uid=clipper_uid)
|
||||
except SocialAccount.DoesNotExist:
|
||||
return
|
||||
|
||||
# An account with that uid was registered, but potentially
|
||||
# deprecated at the beginning of the year
|
||||
# We need to check that the user is still the same as before
|
||||
ldap_data = get_ldap_infos(clipper_uid)
|
||||
sociallogin._ldap_data = ldap_data
|
||||
|
||||
if ldap_data is None or 'entrance_year' not in ldap_data:
|
||||
raise ValueError("No entrance year in LDAP data")
|
||||
|
||||
old_conn_entrance_year = (
|
||||
old_conn.extra_data.get('ldap', {}).get('entrance_year'))
|
||||
|
||||
if old_conn_entrance_year != ldap_data['entrance_year']:
|
||||
# We cannot reuse this SocialAccount, so we need to invalidate
|
||||
# the email address of the previous user to prevent conflicts
|
||||
# if a new SocialAccount is created
|
||||
email = ldap_data.get('email', get_clipper_email(clipper_uid))
|
||||
remove_email(old_conn.user, email)
|
||||
|
||||
return
|
||||
|
||||
# The admission year is the same, we can update the model and keep
|
||||
# the previous SocialAccount instance
|
||||
old_conn.provider = 'clipper'
|
||||
old_conn.save()
|
||||
|
||||
# Redo the thing that had failed just before
|
||||
sociallogin.lookup()
|
||||
|
||||
def get_username(self, clipper_uid, data):
|
||||
"""
|
||||
Util function to generate a unique username, by default 'clipper@promo'
|
||||
"""
|
||||
if data is None or 'entrance_year' not in data:
|
||||
raise ValueError("No entrance year in LDAP data")
|
||||
return "{}@{}".format(clipper_uid, data['entrance_year'])
|
||||
|
||||
def save_user(self, request, sociallogin, form=None):
|
||||
if sociallogin.account.provider != "clipper":
|
||||
return super(LongTermClipperAccountAdapter,
|
||||
self).save_user(request, sociallogin, form)
|
||||
user = sociallogin.user
|
||||
user.set_unusable_password()
|
||||
|
||||
clipper_uid = sociallogin.account.uid
|
||||
ldap_data = sociallogin._ldap_data if \
|
||||
hasattr(sociallogin, '_ldap_data') \
|
||||
else get_ldap_infos(clipper_uid)
|
||||
|
||||
username = self.get_username(clipper_uid, ldap_data)
|
||||
email = ldap_data.get('email', get_clipper_email(clipper_uid))
|
||||
name = ldap_data.get('name')
|
||||
user_username(user, username or '')
|
||||
user_email(user, email or '')
|
||||
name_parts = (name or '').split(' ')
|
||||
user_field(user, 'first_name', name_parts[0])
|
||||
user_field(user, 'last_name', ' '.join(name_parts[1:]))
|
||||
|
||||
# Entrance year and department, if the user has these fields
|
||||
entrance_year = ldap_data.get('entrance_year', '')
|
||||
dep_code = ldap_data.get('department_code', '')
|
||||
dep_fancy = ldap_data.get('department', '')
|
||||
promotion = u'%s %s' % (dep_fancy, entrance_year)
|
||||
user_field(user, 'entrance_year', entrance_year)
|
||||
user_field(user, 'department_code', dep_code)
|
||||
user_field(user, 'department', dep_fancy)
|
||||
user_field(user, 'promotion', promotion)
|
||||
|
||||
# Ignore form
|
||||
get_account_adapter().populate_username(request, user)
|
||||
|
||||
# Save extra data (only once)
|
||||
sociallogin.account.extra_data['ldap'] = ldap_data
|
||||
sociallogin.save(request)
|
||||
sociallogin.account.save()
|
||||
|
||||
return user
|
||||
|
||||
|
||||
def deprecate_clippers():
|
||||
"""
|
||||
Marks all the SocialAccount with clipper as deprecated, by setting their
|
||||
provider to 'clipper_inactive'
|
||||
"""
|
||||
|
||||
clippers = SocialAccount.objects.filter(provider='clipper')
|
||||
c_uids = clippers.values_list('uid', flat=True)
|
||||
|
||||
# Clear old clipper accounts that were replaced by new ones
|
||||
# (to avoid conflicts)
|
||||
SocialAccount.objects.filter(provider='clipper_inactive',
|
||||
uid__in=c_uids).delete()
|
||||
|
||||
# Deprecate accounts
|
||||
clippers.update(provider='clipper_inactive')
|
||||
|
||||
|
||||
def install_longterm_adapter(fake=False):
|
||||
"""
|
||||
Manages the transition from an older django_cas or an allauth_ens
|
||||
installation without LongTermClipperAccountAdapter
|
||||
"""
|
||||
|
||||
accounts = {u.username: u for u in User.objects.all()
|
||||
if u.username.isalnum()}
|
||||
ldap_connection = init_ldap()
|
||||
ltc_adapter = get_adapter()
|
||||
|
||||
info = ldap_connection.search_s(
|
||||
'dc=spi,dc=ens,dc=fr',
|
||||
ldap.SCOPE_SUBTREE,
|
||||
("(|{})".format(''.join(("(uid=%s)" % (un,))
|
||||
for un in accounts.keys()))),
|
||||
['uid',
|
||||
'cn',
|
||||
'mailRoutingAddress',
|
||||
'homeDirectory'])
|
||||
|
||||
logs = {"created": [], "updated": []}
|
||||
cases = []
|
||||
|
||||
for userinfo in info:
|
||||
infos = userinfo[1]
|
||||
data = extract_infos_from_ldap(infos)
|
||||
clipper_uid = data['clipper_uid']
|
||||
user = accounts.get(clipper_uid, None)
|
||||
if user is None:
|
||||
continue
|
||||
user.username = ltc_adapter.get_username(clipper_uid, data)
|
||||
if fake:
|
||||
cases.append(clipper_uid)
|
||||
else:
|
||||
user.save()
|
||||
cases.append(user.username)
|
||||
|
||||
try:
|
||||
sa = SocialAccount.objects.get(provider='clipper', uid=clipper_uid)
|
||||
if not sa.extra_data.get('ldap'):
|
||||
sa.extra_data['ldap'] = data
|
||||
if not fake:
|
||||
sa.save(update_fields=['extra_data'])
|
||||
logs["updated"].append((clipper_uid, user.username))
|
||||
except SocialAccount.DoesNotExist:
|
||||
sa = SocialAccount(
|
||||
provider='clipper', uid=clipper_uid,
|
||||
user=user, extra_data={'ldap': data},
|
||||
)
|
||||
if not fake:
|
||||
sa.save()
|
||||
logs["created"].append((clipper_uid, user.username))
|
||||
|
||||
logs["unmodified"] = User.objects.exclude(username__in=cases)\
|
||||
.values_list("username", flat=True)
|
||||
return logs
|
0
allauth_ens/management/__init__.py
Normal file
0
allauth_ens/management/__init__.py
Normal file
0
allauth_ens/management/commands/__init__.py
Normal file
0
allauth_ens/management/commands/__init__.py
Normal file
16
allauth_ens/management/commands/deprecate_clippers.py
Normal file
16
allauth_ens/management/commands/deprecate_clippers.py
Normal file
|
@ -0,0 +1,16 @@
|
|||
# coding: utf-8
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from allauth_ens.adapter import deprecate_clippers
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = 'Deprecates clipper SocialAccounts so as to avoid conflicts'
|
||||
|
||||
def add_arguments(self, parser):
|
||||
pass
|
||||
|
||||
def handle(self, *args, **options):
|
||||
deprecate_clippers()
|
||||
self.stdout.write(self.style.SUCCESS(
|
||||
'Clippers deprecation successful'))
|
35
allauth_ens/management/commands/install_longterm.py
Normal file
35
allauth_ens/management/commands/install_longterm.py
Normal file
|
@ -0,0 +1,35 @@
|
|||
# coding: utf-8
|
||||
from django.core.management.base import BaseCommand
|
||||
|
||||
from allauth_ens.adapter import install_longterm_adapter
|
||||
|
||||
|
||||
class Command(BaseCommand):
|
||||
help = 'Manages the transition from an older django_cas' \
|
||||
'or an allauth_ens installation without ' \
|
||||
'LongTermClipperAccountAdapter'
|
||||
|
||||
def add_arguments(self, parser):
|
||||
parser.add_argument(
|
||||
'--fake',
|
||||
action='store_true',
|
||||
default=False,
|
||||
help=('Does not save the models created/updated,'
|
||||
'only shows the list'),
|
||||
)
|
||||
pass
|
||||
|
||||
def handle(self, *args, **options):
|
||||
logs = install_longterm_adapter(options.get("fake", False))
|
||||
self.stdout.write("Social accounts created : %d"
|
||||
% len(logs["created"]))
|
||||
self.stdout.write(" ".join(("%s -> %s" % s) for s in logs["created"]))
|
||||
self.stdout.write("Social accounts displaced : %d"
|
||||
% len(logs["updated"]))
|
||||
self.stdout.write(" ".join(("%s -> %s" % s) for s in logs["updated"]))
|
||||
self.stdout.write("User accounts unmodified : %d"
|
||||
% len(logs["unmodified"]))
|
||||
self.stdout.write(" ".join(logs["unmodified"]))
|
||||
|
||||
self.stdout.write(self.style.SUCCESS(
|
||||
"LongTermClipper migration successful"))
|
|
@ -1,11 +1,8 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
import ldap
|
||||
|
||||
from allauth.account.models import EmailAddress
|
||||
from allauth.socialaccount.providers.base import ProviderAccount
|
||||
from allauth_cas.providers import CASProvider
|
||||
|
||||
from django.conf import settings
|
||||
from allauth_cas.providers import CASProvider
|
||||
|
||||
|
||||
class ClipperAccount(ProviderAccount):
|
||||
|
@ -21,41 +18,14 @@ class ClipperProvider(CASProvider):
|
|||
uid, extra = data
|
||||
return '{}@clipper.ens.fr'.format(uid.strip().lower())
|
||||
|
||||
def extract_uid(self, data):
|
||||
uid, _ = data
|
||||
uid = uid.lower().strip()
|
||||
return uid
|
||||
|
||||
def extract_common_fields(self, data):
|
||||
def get_names(clipper):
|
||||
assert clipper.isalnum()
|
||||
try:
|
||||
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,
|
||||
ldap.OPT_X_TLS_NEVER)
|
||||
l = ldap.initialize("ldaps://ldap.spi.ens.fr:636")
|
||||
l.set_option(ldap.OPT_REFERRALS, 0)
|
||||
l.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
|
||||
l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
|
||||
l.set_option(ldap.OPT_X_TLS_DEMAND, True)
|
||||
l.set_option(ldap.OPT_DEBUG_LEVEL, 255)
|
||||
l.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
|
||||
l.set_option(ldap.OPT_TIMEOUT, 10)
|
||||
|
||||
info = l.search_s('dc=spi,dc=ens,dc=fr',
|
||||
ldap.SCOPE_SUBTREE,
|
||||
('(uid=%s)' % (clipper,)),
|
||||
[str("cn"), ])
|
||||
|
||||
if len(info) > 0:
|
||||
fullname = info[0][1].get('cn', [''])[0].decode("utf-8")
|
||||
first_name, last_name = fullname.split(' ', 1)
|
||||
return first_name, last_name
|
||||
|
||||
except ldap.LDAPError:
|
||||
pass
|
||||
|
||||
return '', ''
|
||||
|
||||
common = super(ClipperProvider, self).extract_common_fields(data)
|
||||
fn, ln = get_names(common['username'])
|
||||
common['email'] = self.extract_email(data)
|
||||
common['name'] = fn
|
||||
common['last_name'] = ln
|
||||
return common
|
||||
|
||||
def extract_email_addresses(self, data):
|
||||
|
@ -67,8 +37,23 @@ class ClipperProvider(CASProvider):
|
|||
]
|
||||
|
||||
def extract_extra_data(self, data):
|
||||
"""
|
||||
If LongTermClipperAccountAdapter is in use, keep the data retrieved
|
||||
from the LDAP server.
|
||||
"""
|
||||
from allauth.socialaccount.models import SocialAccount # noqa
|
||||
extra_data = super(ClipperProvider, self).extract_extra_data(data)
|
||||
extra_data['email'] = self.extract_email(data)
|
||||
|
||||
# Preserve LDAP data at all cost.
|
||||
try:
|
||||
clipper_account = SocialAccount.objects.get(
|
||||
provider=self.id, uid=self.extract_uid(data))
|
||||
if 'ldap' in clipper_account.extra_data:
|
||||
extra_data['ldap'] = clipper_account.extra_data['ldap']
|
||||
except SocialAccount.DoesNotExist:
|
||||
pass
|
||||
|
||||
return extra_data
|
||||
|
||||
def message_suggest_caslogout_on_logout(self, request):
|
||||
|
|
|
@ -17,6 +17,18 @@ class ClipperProviderTests(CASTestCase):
|
|||
u = User.objects.get(username='clipper_uid')
|
||||
self.assertEqual(u.email, 'clipper_uid@clipper.ens.fr')
|
||||
|
||||
def test_extra_data_keeps_ldap_data(self):
|
||||
clipper_conn = self.u.socialaccount_set.create(
|
||||
uid='user', provider='clipper',
|
||||
extra_data={'ldap': {'aa': 'bb'}},
|
||||
)
|
||||
|
||||
self.client_cas_login(
|
||||
self.client, provider_id='clipper', username='user')
|
||||
|
||||
clipper_conn.refresh_from_db()
|
||||
self.assertEqual(clipper_conn.extra_data['ldap'], {'aa': 'bb'})
|
||||
|
||||
|
||||
class ClipperViewsTests(CASViewTestCase):
|
||||
|
||||
|
|
|
@ -1,3 +1,5 @@
|
|||
from __future__ import unicode_literals
|
||||
|
||||
import re
|
||||
|
||||
import django
|
||||
|
@ -6,6 +8,19 @@ from django.contrib.sites.models import Site
|
|||
from django.core import mail
|
||||
from django.test import TestCase, override_settings
|
||||
|
||||
from allauth.socialaccount.models import SocialAccount
|
||||
|
||||
import six
|
||||
from allauth_cas.test.testcases import CASTestCase
|
||||
from fakeldap import MockLDAP
|
||||
from mock import patch
|
||||
|
||||
from .adapter import deprecate_clippers, install_longterm_adapter
|
||||
|
||||
_mock_ldap = MockLDAP()
|
||||
ldap_patcher = patch('allauth_ens.utils.ldap.initialize',
|
||||
lambda x: _mock_ldap)
|
||||
|
||||
if django.VERSION >= (1, 10):
|
||||
from django.urls import reverse
|
||||
else:
|
||||
|
@ -135,3 +150,220 @@ class ViewsTests(TestCase):
|
|||
def test_account_reset_password_from_key_done(self):
|
||||
r = self.client.get(reverse('account_reset_password_from_key_done'))
|
||||
self.assertEqual(r.status_code, 200)
|
||||
|
||||
|
||||
@override_settings(
|
||||
SOCIALACCOUNT_ADAPTER='allauth_ens.adapter.LongTermClipperAccountAdapter'
|
||||
)
|
||||
class LongTermClipperTests(CASTestCase):
|
||||
def setUp(self):
|
||||
ldap_patcher.start()
|
||||
|
||||
def tearDown(self):
|
||||
ldap_patcher.stop()
|
||||
_mock_ldap.reset()
|
||||
|
||||
def _setup_ldap(self, promo=12, username="test"):
|
||||
try:
|
||||
buid = six.binary_type(username, 'utf-8')
|
||||
home = six.binary_type('/users/%d/phy/test/' % promo, 'utf-8')
|
||||
except TypeError:
|
||||
buid = six.binary_type(username)
|
||||
home = six.binary_type('/users/%d/phy/test/' % promo)
|
||||
_mock_ldap.directory['dc=spi,dc=ens,dc=fr'] = {
|
||||
'uid': [buid],
|
||||
'cn': [b'John Smith'],
|
||||
'mailRoutingAddress': [b'test@clipper.ens.fr'],
|
||||
'homeDirectory': [home],
|
||||
}
|
||||
|
||||
def _count_ldap_queries(self):
|
||||
queries = _mock_ldap.ldap_methods_called()
|
||||
count = len([op for op in queries if op != 'set_option'])
|
||||
return count
|
||||
|
||||
def test_new_connexion(self):
|
||||
self._setup_ldap()
|
||||
|
||||
r = self.client_cas_login(self.client, provider_id="clipper",
|
||||
username="test")
|
||||
u = r.context['user']
|
||||
|
||||
self.assertEqual(u.username, "test@12")
|
||||
self.assertEqual(u.first_name, "John")
|
||||
self.assertEqual(u.last_name, "Smith")
|
||||
self.assertEqual(u.email, "test@clipper.ens.fr")
|
||||
self.assertEqual(self._count_ldap_queries(), 1)
|
||||
|
||||
sa = list(SocialAccount.objects.all())[-1]
|
||||
self.assertEqual(sa.user.id, u.id)
|
||||
self.assertEqual(sa.extra_data['ldap']['entrance_year'], '12')
|
||||
|
||||
def test_connect_disconnect(self):
|
||||
self._setup_ldap()
|
||||
r0 = self.client_cas_login(self.client, provider_id="clipper",
|
||||
username="test")
|
||||
self.assertIn("_auth_user_id", self.client.session)
|
||||
self.assertIn('user', r0.context)
|
||||
|
||||
self.client.logout()
|
||||
self.assertNotIn("_auth_user_id", self.client.session)
|
||||
|
||||
def test_second_connexion(self):
|
||||
self._setup_ldap()
|
||||
|
||||
self.client_cas_login(self.client, provider_id="clipper",
|
||||
username="test")
|
||||
self.client.logout()
|
||||
|
||||
nu = User.objects.count()
|
||||
|
||||
self.client_cas_login(self.client, provider_id="clipper",
|
||||
username="test")
|
||||
self.assertEqual(User.objects.count(), nu)
|
||||
self.assertEqual(self._count_ldap_queries(), 1)
|
||||
|
||||
def test_deprecation(self):
|
||||
self._setup_ldap()
|
||||
self.client_cas_login(self.client, provider_id="clipper",
|
||||
username="test")
|
||||
deprecate_clippers()
|
||||
|
||||
sa = SocialAccount.objects.all()[0]
|
||||
self.assertEqual(sa.provider, "clipper_inactive")
|
||||
|
||||
def test_reconnect_after_deprecation(self):
|
||||
self._setup_ldap()
|
||||
r = self.client_cas_login(self.client, provider_id="clipper",
|
||||
username="test")
|
||||
user0 = r.context['user']
|
||||
n_sa0 = SocialAccount.objects.count()
|
||||
n_u0 = User.objects.count()
|
||||
self.client.logout()
|
||||
|
||||
deprecate_clippers()
|
||||
|
||||
r = self.client_cas_login(self.client, provider_id="clipper",
|
||||
username="test")
|
||||
user1 = r.context['user']
|
||||
sa1 = list(SocialAccount.objects.all())
|
||||
n_u1 = User.objects.count()
|
||||
self.assertEqual(len(sa1), n_sa0)
|
||||
self.assertEqual(n_u1, n_u0)
|
||||
self.assertEqual(user1.id, user0.id)
|
||||
self.assertEqual(self._count_ldap_queries(), 2)
|
||||
|
||||
def test_override_inactive_account(self):
|
||||
self._setup_ldap(12)
|
||||
r = self.client_cas_login(self.client, provider_id="clipper",
|
||||
username="test")
|
||||
user0 = r.context['user']
|
||||
n_sa0 = SocialAccount.objects.count()
|
||||
n_u0 = User.objects.count()
|
||||
self.client.logout()
|
||||
|
||||
deprecate_clippers()
|
||||
|
||||
self._setup_ldap(13)
|
||||
r = self.client_cas_login(self.client, provider_id="clipper",
|
||||
username="test")
|
||||
|
||||
user1 = r.context['user']
|
||||
sa1 = list(SocialAccount.objects.all())
|
||||
n_u1 = User.objects.count()
|
||||
self.assertEqual(len(sa1), n_sa0 + 1)
|
||||
self.assertEqual(n_u1, n_u0 + 1)
|
||||
self.assertNotEqual(user1.id, user0.id)
|
||||
|
||||
def test_multiple_deprecation(self):
|
||||
self._setup_ldap(12)
|
||||
self.client_cas_login(self.client, provider_id="clipper",
|
||||
username="test")
|
||||
self.client.logout()
|
||||
|
||||
self._setup_ldap(15, "truc")
|
||||
self.client_cas_login(self.client, provider_id="clipper",
|
||||
username="truc")
|
||||
self.client.logout()
|
||||
sa0 = SocialAccount.objects.count()
|
||||
|
||||
deprecate_clippers()
|
||||
|
||||
self._setup_ldap(13)
|
||||
self.client_cas_login(self.client, provider_id="clipper",
|
||||
username="test")
|
||||
self.client.logout()
|
||||
|
||||
sa1 = SocialAccount.objects.count()
|
||||
|
||||
deprecate_clippers()
|
||||
sa2 = SocialAccount.objects.count()
|
||||
|
||||
# Older "test" inactive SocialAccount gets erased by new one
|
||||
# while "truc" remains
|
||||
self.assertEqual(sa0, sa2)
|
||||
self.assertEqual(sa1, sa0 + 1)
|
||||
|
||||
def test_longterm_installer_from_allauth(self):
|
||||
self._setup_ldap(12)
|
||||
with self.settings(
|
||||
SOCIALACCOUNT_ADAPTER='allauth.socialaccount.'
|
||||
'adapter.DefaultSocialAccountAdapter'):
|
||||
r = self.client_cas_login(self.client, provider_id="clipper",
|
||||
username='test')
|
||||
user0 = r.context["user"]
|
||||
nsa0 = SocialAccount.objects.count()
|
||||
self.assertEqual(user0.username, "test")
|
||||
self.client.logout()
|
||||
|
||||
outputs = install_longterm_adapter()
|
||||
|
||||
self.assertEqual(outputs["updated"], [("test", "test@12")])
|
||||
r = self.client_cas_login(self.client, provider_id="clipper",
|
||||
username='test')
|
||||
user1 = r.context["user"]
|
||||
nsa1 = SocialAccount.objects.count()
|
||||
conn = user1.socialaccount_set.get(provider='clipper')
|
||||
self.assertEqual(user1.id, user0.id)
|
||||
self.assertEqual(nsa1, nsa0)
|
||||
self.assertEqual(user1.username, "test@12")
|
||||
self.assertEqual(conn.extra_data['ldap']['entrance_year'], '12')
|
||||
|
||||
def test_longterm_installer_from_djangocas(self):
|
||||
with self.settings(
|
||||
SOCIALACCOUNT_ADAPTER='allauth.socialaccount.'
|
||||
'adapter.DefaultSocialAccountAdapter'):
|
||||
user0 = User.objects.create_user('test', 'test@clipper.ens.fr',
|
||||
'test')
|
||||
nsa0 = SocialAccount.objects.count()
|
||||
|
||||
self._setup_ldap(12)
|
||||
|
||||
outputs = install_longterm_adapter()
|
||||
|
||||
self.assertEqual(outputs["created"], [("test", "test@12")])
|
||||
r = self.client_cas_login(self.client, provider_id="clipper",
|
||||
username='test')
|
||||
user1 = r.context["user"]
|
||||
nsa1 = SocialAccount.objects.count()
|
||||
conn = user1.socialaccount_set.get(provider='clipper')
|
||||
self.assertEqual(user1.id, user0.id)
|
||||
self.assertEqual(nsa1, nsa0 + 1)
|
||||
self.assertEqual(user1.username, "test@12")
|
||||
self.assertEqual(conn.extra_data['ldap']['entrance_year'], '12')
|
||||
|
||||
def test_disconnect_ldap(self):
|
||||
nu0 = User.objects.count()
|
||||
nsa0 = SocialAccount.objects.count()
|
||||
|
||||
ldap_patcher.stop()
|
||||
with self.settings(CLIPPER_LDAP_SERVER=''):
|
||||
self.assertRaises(ValueError, self.client_cas_login,
|
||||
self.client, provider_id="clipper",
|
||||
username="test")
|
||||
|
||||
nu1 = User.objects.count()
|
||||
nsa1 = SocialAccount.objects.count()
|
||||
self.assertEqual(nu0, nu1)
|
||||
self.assertEqual(nsa0, nsa1)
|
||||
ldap_patcher.start()
|
||||
|
|
126
allauth_ens/utils.py
Normal file
126
allauth_ens/utils.py
Normal file
|
@ -0,0 +1,126 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from allauth.account.models import EmailAddress
|
||||
from allauth.account.utils import user_email
|
||||
|
||||
import ldap
|
||||
|
||||
DEPARTMENTS_LIST = {
|
||||
'phy': u'Physique',
|
||||
'maths': u'Maths',
|
||||
'bio': u'Biologie',
|
||||
'chimie': u'Chimie',
|
||||
'geol': u'Géosciences',
|
||||
'dec': u'DEC',
|
||||
'info': u'Informatique',
|
||||
'litt': u'Littéraire',
|
||||
'guests': u'Pensionnaires étrangers',
|
||||
'pei': u'PEI',
|
||||
}
|
||||
|
||||
|
||||
def init_ldap():
|
||||
server = getattr(settings, "CLIPPER_LDAP_SERVER",
|
||||
"ldaps://ldap.spi.ens.fr:636")
|
||||
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,
|
||||
ldap.OPT_X_TLS_NEVER)
|
||||
ldap_connection = ldap.initialize(server)
|
||||
ldap_connection.set_option(ldap.OPT_REFERRALS, 0)
|
||||
ldap_connection.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
|
||||
ldap_connection.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
|
||||
ldap_connection.set_option(ldap.OPT_X_TLS_DEMAND, True)
|
||||
ldap_connection.set_option(ldap.OPT_DEBUG_LEVEL, 255)
|
||||
ldap_connection.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
|
||||
ldap_connection.set_option(ldap.OPT_TIMEOUT, 10)
|
||||
return ldap_connection
|
||||
|
||||
|
||||
def extract_infos_from_ldap(infos):
|
||||
data = {}
|
||||
|
||||
# Name
|
||||
if 'cn' in infos:
|
||||
data['name'] = infos['cn'][0].decode("utf-8")
|
||||
|
||||
# Parsing homeDirectory to get entrance year and departments
|
||||
if 'homeDirectory' in infos:
|
||||
dirs = infos['homeDirectory'][0].decode("utf-8").split('/')
|
||||
if len(dirs) >= 4 and dirs[1] == 'users':
|
||||
# Assume template "/users/<year>/<department>/clipper/"
|
||||
annee = dirs[2]
|
||||
dep = dirs[3]
|
||||
dep_fancy = DEPARTMENTS_LIST.get(dep.lower(), '')
|
||||
data['entrance_year'] = annee
|
||||
data['department_code'] = dep
|
||||
data['department'] = dep_fancy
|
||||
|
||||
# Mail
|
||||
pmail = infos.get('mailRoutingAddress', [])
|
||||
if pmail:
|
||||
data['email'] = pmail[0].decode("utf-8")
|
||||
|
||||
# User id
|
||||
if 'uid' in infos:
|
||||
data['clipper_uid'] = infos['uid'][0].decode("utf-8").strip().lower()
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def get_ldap_infos(clipper_uid):
|
||||
assert clipper_uid.isalnum()
|
||||
data = {}
|
||||
try:
|
||||
ldap_connection = init_ldap()
|
||||
|
||||
info = ldap_connection.search_s('dc=spi,dc=ens,dc=fr',
|
||||
ldap.SCOPE_SUBTREE,
|
||||
('(uid=%s)' % (clipper_uid,)),
|
||||
['cn',
|
||||
'mailRoutingAddress',
|
||||
'homeDirectory'])
|
||||
|
||||
if len(info) > 0:
|
||||
data = extract_infos_from_ldap(info[0][1])
|
||||
|
||||
except ldap.LDAPError:
|
||||
pass
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def get_clipper_email(clipper):
|
||||
return '{}@clipper.ens.fr'.format(clipper.strip().lower())
|
||||
|
||||
|
||||
def remove_email(user, email):
|
||||
"""
|
||||
Removes an email address of a user.
|
||||
|
||||
If it is his primary email address, it sets another email address as
|
||||
primary, preferably verified.
|
||||
"""
|
||||
u_mailaddrs = user.emailaddress_set.filter(user=user)
|
||||
try:
|
||||
mailaddr = user.emailaddress_set.get(email=email)
|
||||
except EmailAddress.DoesNotExist:
|
||||
return
|
||||
|
||||
if mailaddr.primary:
|
||||
others = u_mailaddrs.filter(primary=False)
|
||||
|
||||
# Prefer a verified mail.
|
||||
new_primary = (
|
||||
others.filter(verified=True).last() or
|
||||
others.last()
|
||||
)
|
||||
|
||||
if new_primary:
|
||||
# It also updates 'user.EMAIL_FIELD'.
|
||||
new_primary.set_as_primary()
|
||||
else:
|
||||
user_email(user, '')
|
||||
user.save()
|
||||
|
||||
mailaddr.delete()
|
|
@ -1,11 +1,17 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from allauth.account.adapter import DefaultAccountAdapter
|
||||
from allauth_ens.adapter import LongTermClipperAccountAdapter
|
||||
from allauth.socialaccount.adapter import DefaultSocialAccountAdapter
|
||||
|
||||
|
||||
class AccountAdapter(DefaultAccountAdapter):
|
||||
pass
|
||||
|
||||
|
||||
class SocialAccountAdapter(DefaultSocialAccountAdapter):
|
||||
pass
|
||||
class SocialAccountAdapter(LongTermClipperAccountAdapter):
|
||||
|
||||
def get_username(self, clipper, data):
|
||||
"""
|
||||
Exception-free version of get_username, so that it works even outside of
|
||||
the ENS (if no access to LDAP server)
|
||||
"""
|
||||
return "{}@{}".format(clipper, data.get('entrance_year', '00'))
|
||||
|
|
|
@ -7,7 +7,6 @@ combine_as_imports = True
|
|||
default_section = THIRDPARTY
|
||||
include_trailing_comma = True
|
||||
known_allauth = allauth
|
||||
known_future_library = future,six
|
||||
known_django = django
|
||||
known_first_party = allauth_ens
|
||||
multi_line_output = 5
|
||||
|
|
3
tox.ini
3
tox.ini
|
@ -15,7 +15,8 @@ deps =
|
|||
django111: django>=1.11,<2.0
|
||||
django20: django>=2.0,<2.1
|
||||
coverage
|
||||
mock ; python_version < "3.0"
|
||||
fakeldap
|
||||
mock
|
||||
usedevelop= True
|
||||
commands =
|
||||
python -V
|
||||
|
|
Loading…
Reference in a new issue