Gérer la fin de scolarité #9
13 changed files with 718 additions and 47 deletions
65
README.rst
65
README.rst
|
@ -148,23 +148,72 @@ Configuration
|
||||||
|
|
||||||
SOCIALACCOUNT_PROVIDERS = {
|
SOCIALACCOUNT_PROVIDERS = {
|
||||||
# …
|
# …
|
||||||
|
|
||||||
'clipper': {
|
'clipper': {
|
||||||
|
|
||||||
# These settings control whether a message containing a link to
|
# These settings control whether a message containing a link to
|
||||||
# disconnect from the CAS server is added when users log out.
|
# disconnect from the CAS server is added when users log out.
|
||||||
'MESSAGE_SUGGEST_LOGOUT_ON_LOGOUT': True,
|
'MESSAGE_SUGGEST_LOGOUT_ON_LOGOUT': True,
|
||||||
'MESSAGE_SUGGEST_LOGOUT_ON_LOGOUT_LEVEL': messages.INFO,
|
'MESSAGE_SUGGEST_LOGOUT_ON_LOGOUT_LEVEL': messages.INFO,
|
||||||
|
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
Auto-signup
|
Auto-signup
|
||||||
Poulated data
|
Populated data
|
||||||
- username: ``<clipper>``
|
- username: ``<clipper>``
|
||||||
- email (primary and verified): ``<clipper>@clipper.ens.fr``
|
- email (primary and verified): ``<clipper>@clipper.ens.fr``
|
||||||
|
|
||||||
|
********
|
||||||
|
Adapters
|
||||||
|
********
|
||||||
|
|
||||||
|
Long Term Clipper Adapter
|
||||||
|
=========================
|
||||||
|
|
||||||
|
We provide an easy-to-use SocialAccountAdapter to handle the fact that Clipper
|
||||||
|
accounts are not eternal, and that there is no guarantee that the clipper
|
||||||
|
usernames won't be reused later.
|
||||||
|
|
||||||
|
This adapter also handles getting basic information about the user from SPI's
|
||||||
|
LDAP.
|
||||||
|
|
||||||
|
**Important:** If you are building on top of *allauth*, take care to preserve
|
||||||
|
the ``extra_data['ldap']`` of ``SocialAccount`` instances related to *Clipper*
|
||||||
|
(where ``provider_id`` is ``clipper`` or ``clipper_inactive``).
|
||||||
|
|
||||||
|
Configuration
|
||||||
|
Set ``SOCIALACCOUNT_ADAPTER='allauth_ens.adapter.LongTermClipperAccountAdapter'``
|
||||||
|
in `settings.py`
|
||||||
|
|
||||||
|
Auto-signup
|
||||||
|
Populated data
|
||||||
|
- *username*: ``<clipper>@<entrance year>``
|
||||||
|
- *email*: from LDAP's *mailRoutingAddress* field, or ``<clipper>@clipper.ens.fr``
|
||||||
|
- *first_name*, *last_name* from LDAP's *cn* field
|
||||||
|
- *entrance_year* (as 2-digit string), *department_code*, *department* and *promotion* (department+year) parsed from LDAP's *homeDirectory* field
|
||||||
|
- *extra_data* in SocialAccount instance, containing all these field except *promotion* (and available only on first connection)
|
||||||
|
|
||||||
|
Account deprecation
|
||||||
|
At the beginning of each year (i.e. early November), to prevent clipper
|
||||||
|
username conflicts, you should run ``$ python manage.py deprecate_clippers``.
|
||||||
|
Every association clipper username <-> user will then be put on hold, and at
|
||||||
|
the first subsequent connection, a verification of the account will be made
|
||||||
|
(using LDAP), so that a known user keeps his account, but a newcomer won't
|
||||||
|
inherit an archicube's.
|
||||||
|
|
||||||
|
Customize
|
||||||
|
You can customize the SocialAccountAdapter by inheriting
|
||||||
|
``allauth_ens.adapter.LongTermClipperAccountAdapter``. You might want to
|
||||||
|
modify ``get_username(clipper, data)`` to change the default username format.
|
||||||
|
By default, ``get_username`` raises a ``ValueError`` when the connexion to the
|
||||||
|
LDAP failed or did not allow to retrieve the user's entrance year. Overriding
|
||||||
|
``get_username`` (as done in the example website) allows to get rid of that
|
||||||
|
behaviour, and for instance attribute a default entrance year.
|
||||||
|
|
||||||
|
Initial migration
|
||||||
|
If you used allauth without LongTermClipperAccountAdapter, or another CAS
|
||||||
|
interface to log in, you need to update the Users to the new username policy,
|
||||||
|
and (in the second case) to create the SocialAccount instances to link CAS and
|
||||||
|
Users. This can be done easily with ``$ python manage.py install_longterm``.
|
||||||
|
|
||||||
*********
|
*********
|
||||||
Demo Site
|
Demo Site
|
||||||
*********
|
*********
|
||||||
|
@ -201,7 +250,11 @@ Tests
|
||||||
Local environment
|
Local environment
|
||||||
-----------------
|
-----------------
|
||||||
|
|
||||||
``$ ./runtests.py``
|
Requirements
|
||||||
|
* fakeldap and mock, install with ``$ pip install mock fakeldap``
|
||||||
|
|
||||||
|
Run
|
||||||
|
* ``$ ./runtests.py``
|
||||||
|
|
||||||
All
|
All
|
||||||
---
|
---
|
||||||
|
|
206
allauth_ens/adapter.py
Normal file
206
allauth_ens/adapter.py
Normal file
|
@ -0,0 +1,206 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from django.contrib.auth import get_user_model
|
||||||
|
|
||||||
|
from allauth.account.utils import user_email, user_field, user_username
|
||||||
|
from allauth.socialaccount.adapter import (
|
||||||
|
DefaultSocialAccountAdapter, get_account_adapter, get_adapter,
|
||||||
|
)
|
||||||
|
from allauth.socialaccount.models import SocialAccount
|
||||||
|
|
||||||
|
import ldap
|
||||||
|
|
||||||
|
from .utils import (
|
||||||
|
extract_infos_from_ldap, get_clipper_email, get_ldap_infos, init_ldap,
|
||||||
|
remove_email,
|
||||||
|
)
|
||||||
|
|
||||||
|
User = get_user_model()
|
||||||
|
|
||||||
|
|
||||||
|
class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter):
|
||||||
|
"""
|
||||||
|
A class to manage the fact that people loose their account at the end of
|
||||||
|
their scolarity and that their clipper login might be reused later
|
||||||
|
"""
|
||||||
|
|
||||||
|
def pre_social_login(self, request, sociallogin):
|
||||||
|
"""
|
||||||
|
If a clipper connection has already existed with the uid, it checks
|
||||||
|
that this connection still belongs to the user it was associated with.
|
||||||
|
|
||||||
|
This check is performed by comparing the entrance years provided by the
|
||||||
|
LDAP.
|
||||||
|
|
||||||
|
If the check succeeds, it simply reactivates the clipper connection as
|
||||||
|
belonging to the associated user.
|
||||||
|
|
||||||
|
If the check fails, it frees the elements (as the clipper email
|
||||||
|
address) which will be assigned to the new connection later.
|
||||||
|
"""
|
||||||
|
|
||||||
|
if sociallogin.account.provider != "clipper":
|
||||||
|
return super(LongTermClipperAccountAdapter,
|
||||||
|
self).pre_social_login(request, sociallogin)
|
||||||
|
|
||||||
|
clipper_uid = sociallogin.account.uid
|
||||||
|
try:
|
||||||
|
old_conn = SocialAccount.objects.get(provider='clipper_inactive',
|
||||||
|
uid=clipper_uid)
|
||||||
|
except SocialAccount.DoesNotExist:
|
||||||
|
return
|
||||||
|
|
||||||
|
# An account with that uid was registered, but potentially
|
||||||
|
# deprecated at the beginning of the year
|
||||||
|
# We need to check that the user is still the same as before
|
||||||
|
ldap_data = get_ldap_infos(clipper_uid)
|
||||||
|
sociallogin._ldap_data = ldap_data
|
||||||
|
|
||||||
|
if ldap_data is None or 'entrance_year' not in ldap_data:
|
||||||
|
raise ValueError("No entrance year in LDAP data")
|
||||||
|
|
||||||
|
old_conn_entrance_year = (
|
||||||
|
old_conn.extra_data.get('ldap', {}).get('entrance_year'))
|
||||||
|
|
||||||
|
if old_conn_entrance_year != ldap_data['entrance_year']:
|
||||||
|
# We cannot reuse this SocialAccount, so we need to invalidate
|
||||||
|
# the email address of the previous user to prevent conflicts
|
||||||
|
# if a new SocialAccount is created
|
||||||
|
email = ldap_data.get('email', get_clipper_email(clipper_uid))
|
||||||
|
remove_email(old_conn.user, email)
|
||||||
|
|
||||||
|
return
|
||||||
|
|
||||||
|
# The admission year is the same, we can update the model and keep
|
||||||
|
# the previous SocialAccount instance
|
||||||
|
old_conn.provider = 'clipper'
|
||||||
|
old_conn.save()
|
||||||
|
|
||||||
|
# Redo the thing that had failed just before
|
||||||
|
sociallogin.lookup()
|
||||||
|
|
||||||
|
def get_username(self, clipper_uid, data):
|
||||||
|
"""
|
||||||
|
Util function to generate a unique username, by default 'clipper@promo'
|
||||||
|
"""
|
||||||
|
if data is None or 'entrance_year' not in data:
|
||||||
|
raise ValueError("No entrance year in LDAP data")
|
||||||
|
return "{}@{}".format(clipper_uid, data['entrance_year'])
|
||||||
|
|
||||||
|
def save_user(self, request, sociallogin, form=None):
|
||||||
|
if sociallogin.account.provider != "clipper":
|
||||||
|
return super(LongTermClipperAccountAdapter,
|
||||||
|
self).save_user(request, sociallogin, form)
|
||||||
|
user = sociallogin.user
|
||||||
|
user.set_unusable_password()
|
||||||
|
|
||||||
|
clipper_uid = sociallogin.account.uid
|
||||||
|
ldap_data = sociallogin._ldap_data if \
|
||||||
|
hasattr(sociallogin, '_ldap_data') \
|
||||||
|
else get_ldap_infos(clipper_uid)
|
||||||
|
|
||||||
|
username = self.get_username(clipper_uid, ldap_data)
|
||||||
|
email = ldap_data.get('email', get_clipper_email(clipper_uid))
|
||||||
|
name = ldap_data.get('name')
|
||||||
|
user_username(user, username or '')
|
||||||
|
user_email(user, email or '')
|
||||||
|
name_parts = (name or '').split(' ')
|
||||||
|
user_field(user, 'first_name', name_parts[0])
|
||||||
|
user_field(user, 'last_name', ' '.join(name_parts[1:]))
|
||||||
|
|
||||||
|
# Entrance year and department, if the user has these fields
|
||||||
|
entrance_year = ldap_data.get('entrance_year', '')
|
||||||
|
dep_code = ldap_data.get('department_code', '')
|
||||||
|
dep_fancy = ldap_data.get('department', '')
|
||||||
|
promotion = u'%s %s' % (dep_fancy, entrance_year)
|
||||||
|
user_field(user, 'entrance_year', entrance_year)
|
||||||
|
user_field(user, 'department_code', dep_code)
|
||||||
|
user_field(user, 'department', dep_fancy)
|
||||||
|
user_field(user, 'promotion', promotion)
|
||||||
|
|
||||||
|
# Ignore form
|
||||||
|
get_account_adapter().populate_username(request, user)
|
||||||
|
|
||||||
|
# Save extra data (only once)
|
||||||
|
sociallogin.account.extra_data['ldap'] = ldap_data
|
||||||
|
sociallogin.save(request)
|
||||||
|
sociallogin.account.save()
|
||||||
|
|
||||||
|
return user
|
||||||
|
|
||||||
|
|
||||||
|
def deprecate_clippers():
|
||||||
|
"""
|
||||||
|
Marks all the SocialAccount with clipper as deprecated, by setting their
|
||||||
|
provider to 'clipper_inactive'
|
||||||
|
"""
|
||||||
|
|
||||||
|
clippers = SocialAccount.objects.filter(provider='clipper')
|
||||||
|
c_uids = clippers.values_list('uid', flat=True)
|
||||||
|
|
||||||
|
# Clear old clipper accounts that were replaced by new ones
|
||||||
|
# (to avoid conflicts)
|
||||||
|
SocialAccount.objects.filter(provider='clipper_inactive',
|
||||||
|
uid__in=c_uids).delete()
|
||||||
|
|
||||||
|
# Deprecate accounts
|
||||||
|
clippers.update(provider='clipper_inactive')
|
||||||
|
|
||||||
|
|
||||||
|
def install_longterm_adapter(fake=False):
|
||||||
|
"""
|
||||||
|
Manages the transition from an older django_cas or an allauth_ens
|
||||||
|
installation without LongTermClipperAccountAdapter
|
||||||
|
"""
|
||||||
|
|
||||||
|
accounts = {u.username: u for u in User.objects.all()
|
||||||
|
if u.username.isalnum()}
|
||||||
|
ldap_connection = init_ldap()
|
||||||
|
ltc_adapter = get_adapter()
|
||||||
|
|
||||||
|
info = ldap_connection.search_s(
|
||||||
|
'dc=spi,dc=ens,dc=fr',
|
||||||
|
ldap.SCOPE_SUBTREE,
|
||||||
|
("(|{})".format(''.join(("(uid=%s)" % (un,))
|
||||||
|
for un in accounts.keys()))),
|
||||||
|
['uid',
|
||||||
|
'cn',
|
||||||
|
'mailRoutingAddress',
|
||||||
|
'homeDirectory'])
|
||||||
|
|
||||||
|
logs = {"created": [], "updated": []}
|
||||||
|
cases = []
|
||||||
|
|
||||||
|
for userinfo in info:
|
||||||
|
infos = userinfo[1]
|
||||||
|
data = extract_infos_from_ldap(infos)
|
||||||
|
clipper_uid = data['clipper_uid']
|
||||||
|
user = accounts.get(clipper_uid, None)
|
||||||
|
if user is None:
|
||||||
|
continue
|
||||||
|
user.username = ltc_adapter.get_username(clipper_uid, data)
|
||||||
|
if fake:
|
||||||
|
cases.append(clipper_uid)
|
||||||
|
else:
|
||||||
|
user.save()
|
||||||
|
cases.append(user.username)
|
||||||
|
|
||||||
|
try:
|
||||||
|
sa = SocialAccount.objects.get(provider='clipper', uid=clipper_uid)
|
||||||
|
if not sa.extra_data.get('ldap'):
|
||||||
|
sa.extra_data['ldap'] = data
|
||||||
|
if not fake:
|
||||||
|
sa.save(update_fields=['extra_data'])
|
||||||
|
logs["updated"].append((clipper_uid, user.username))
|
||||||
|
except SocialAccount.DoesNotExist:
|
||||||
|
sa = SocialAccount(
|
||||||
|
provider='clipper', uid=clipper_uid,
|
||||||
|
user=user, extra_data={'ldap': data},
|
||||||
|
)
|
||||||
|
if not fake:
|
||||||
|
sa.save()
|
||||||
|
logs["created"].append((clipper_uid, user.username))
|
||||||
|
|
||||||
|
logs["unmodified"] = User.objects.exclude(username__in=cases)\
|
||||||
|
.values_list("username", flat=True)
|
||||||
|
return logs
|
0
allauth_ens/management/__init__.py
Normal file
0
allauth_ens/management/__init__.py
Normal file
0
allauth_ens/management/commands/__init__.py
Normal file
0
allauth_ens/management/commands/__init__.py
Normal file
16
allauth_ens/management/commands/deprecate_clippers.py
Normal file
16
allauth_ens/management/commands/deprecate_clippers.py
Normal file
|
@ -0,0 +1,16 @@
|
||||||
|
# coding: utf-8
|
||||||
|
from django.core.management.base import BaseCommand
|
||||||
|
|
||||||
|
from allauth_ens.adapter import deprecate_clippers
|
||||||
|
|
||||||
|
|
||||||
|
class Command(BaseCommand):
|
||||||
|
help = 'Deprecates clipper SocialAccounts so as to avoid conflicts'
|
||||||
|
|
||||||
|
def add_arguments(self, parser):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def handle(self, *args, **options):
|
||||||
|
deprecate_clippers()
|
||||||
|
self.stdout.write(self.style.SUCCESS(
|
||||||
|
'Clippers deprecation successful'))
|
35
allauth_ens/management/commands/install_longterm.py
Normal file
35
allauth_ens/management/commands/install_longterm.py
Normal file
|
@ -0,0 +1,35 @@
|
||||||
|
# coding: utf-8
|
||||||
|
from django.core.management.base import BaseCommand
|
||||||
|
|
||||||
|
from allauth_ens.adapter import install_longterm_adapter
|
||||||
|
|
||||||
|
|
||||||
|
class Command(BaseCommand):
|
||||||
|
help = 'Manages the transition from an older django_cas' \
|
||||||
|
'or an allauth_ens installation without ' \
|
||||||
|
'LongTermClipperAccountAdapter'
|
||||||
|
|
||||||
|
def add_arguments(self, parser):
|
||||||
|
parser.add_argument(
|
||||||
|
'--fake',
|
||||||
|
action='store_true',
|
||||||
|
default=False,
|
||||||
|
help=('Does not save the models created/updated,'
|
||||||
|
'only shows the list'),
|
||||||
|
)
|
||||||
|
pass
|
||||||
|
|
||||||
|
def handle(self, *args, **options):
|
||||||
|
logs = install_longterm_adapter(options.get("fake", False))
|
||||||
|
self.stdout.write("Social accounts created : %d"
|
||||||
|
% len(logs["created"]))
|
||||||
|
self.stdout.write(" ".join(("%s -> %s" % s) for s in logs["created"]))
|
||||||
|
self.stdout.write("Social accounts displaced : %d"
|
||||||
|
% len(logs["updated"]))
|
||||||
|
self.stdout.write(" ".join(("%s -> %s" % s) for s in logs["updated"]))
|
||||||
|
self.stdout.write("User accounts unmodified : %d"
|
||||||
|
% len(logs["unmodified"]))
|
||||||
|
self.stdout.write(" ".join(logs["unmodified"]))
|
||||||
|
|
||||||
|
self.stdout.write(self.style.SUCCESS(
|
||||||
|
"LongTermClipper migration successful"))
|
|
@ -1,11 +1,8 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
import ldap
|
|
||||||
|
|
||||||
from allauth.account.models import EmailAddress
|
from allauth.account.models import EmailAddress
|
||||||
from allauth.socialaccount.providers.base import ProviderAccount
|
from allauth.socialaccount.providers.base import ProviderAccount
|
||||||
from allauth_cas.providers import CASProvider
|
|
||||||
|
|
||||||
from django.conf import settings
|
from allauth_cas.providers import CASProvider
|
||||||
|
|
||||||
|
|
||||||
class ClipperAccount(ProviderAccount):
|
class ClipperAccount(ProviderAccount):
|
||||||
|
@ -21,41 +18,14 @@ class ClipperProvider(CASProvider):
|
||||||
uid, extra = data
|
uid, extra = data
|
||||||
return '{}@clipper.ens.fr'.format(uid.strip().lower())
|
return '{}@clipper.ens.fr'.format(uid.strip().lower())
|
||||||
|
|
||||||
|
def extract_uid(self, data):
|
||||||
|
uid, _ = data
|
||||||
|
uid = uid.lower().strip()
|
||||||
|
return uid
|
||||||
|
|
||||||
def extract_common_fields(self, data):
|
def extract_common_fields(self, data):
|
||||||
def get_names(clipper):
|
|
||||||
assert clipper.isalnum()
|
|
||||||
try:
|
|
||||||
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,
|
|
||||||
ldap.OPT_X_TLS_NEVER)
|
|
||||||
l = ldap.initialize("ldaps://ldap.spi.ens.fr:636")
|
|
||||||
l.set_option(ldap.OPT_REFERRALS, 0)
|
|
||||||
l.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
|
|
||||||
l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
|
|
||||||
l.set_option(ldap.OPT_X_TLS_DEMAND, True)
|
|
||||||
l.set_option(ldap.OPT_DEBUG_LEVEL, 255)
|
|
||||||
l.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
|
|
||||||
l.set_option(ldap.OPT_TIMEOUT, 10)
|
|
||||||
|
|
||||||
info = l.search_s('dc=spi,dc=ens,dc=fr',
|
|
||||||
ldap.SCOPE_SUBTREE,
|
|
||||||
('(uid=%s)' % (clipper,)),
|
|
||||||
[str("cn"), ])
|
|
||||||
|
|
||||||
if len(info) > 0:
|
|
||||||
fullname = info[0][1].get('cn', [''])[0].decode("utf-8")
|
|
||||||
first_name, last_name = fullname.split(' ', 1)
|
|
||||||
return first_name, last_name
|
|
||||||
|
|
||||||
except ldap.LDAPError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
return '', ''
|
|
||||||
|
|
||||||
common = super(ClipperProvider, self).extract_common_fields(data)
|
common = super(ClipperProvider, self).extract_common_fields(data)
|
||||||
fn, ln = get_names(common['username'])
|
|
||||||
common['email'] = self.extract_email(data)
|
common['email'] = self.extract_email(data)
|
||||||
common['name'] = fn
|
|
||||||
common['last_name'] = ln
|
|
||||||
return common
|
return common
|
||||||
|
|
||||||
def extract_email_addresses(self, data):
|
def extract_email_addresses(self, data):
|
||||||
|
@ -67,8 +37,23 @@ class ClipperProvider(CASProvider):
|
||||||
]
|
]
|
||||||
|
|
||||||
def extract_extra_data(self, data):
|
def extract_extra_data(self, data):
|
||||||
|
"""
|
||||||
|
If LongTermClipperAccountAdapter is in use, keep the data retrieved
|
||||||
|
from the LDAP server.
|
||||||
|
"""
|
||||||
|
from allauth.socialaccount.models import SocialAccount # noqa
|
||||||
extra_data = super(ClipperProvider, self).extract_extra_data(data)
|
extra_data = super(ClipperProvider, self).extract_extra_data(data)
|
||||||
extra_data['email'] = self.extract_email(data)
|
extra_data['email'] = self.extract_email(data)
|
||||||
|
|
||||||
|
# Preserve LDAP data at all cost.
|
||||||
|
try:
|
||||||
|
clipper_account = SocialAccount.objects.get(
|
||||||
|
provider=self.id, uid=self.extract_uid(data))
|
||||||
|
if 'ldap' in clipper_account.extra_data:
|
||||||
|
extra_data['ldap'] = clipper_account.extra_data['ldap']
|
||||||
|
except SocialAccount.DoesNotExist:
|
||||||
|
pass
|
||||||
|
|
||||||
return extra_data
|
return extra_data
|
||||||
|
|
||||||
def message_suggest_caslogout_on_logout(self, request):
|
def message_suggest_caslogout_on_logout(self, request):
|
||||||
|
|
|
@ -17,6 +17,18 @@ class ClipperProviderTests(CASTestCase):
|
||||||
u = User.objects.get(username='clipper_uid')
|
u = User.objects.get(username='clipper_uid')
|
||||||
self.assertEqual(u.email, 'clipper_uid@clipper.ens.fr')
|
self.assertEqual(u.email, 'clipper_uid@clipper.ens.fr')
|
||||||
|
|
||||||
|
def test_extra_data_keeps_ldap_data(self):
|
||||||
|
clipper_conn = self.u.socialaccount_set.create(
|
||||||
|
uid='user', provider='clipper',
|
||||||
|
extra_data={'ldap': {'aa': 'bb'}},
|
||||||
|
)
|
||||||
|
|
||||||
|
self.client_cas_login(
|
||||||
|
self.client, provider_id='clipper', username='user')
|
||||||
|
|
||||||
|
clipper_conn.refresh_from_db()
|
||||||
|
self.assertEqual(clipper_conn.extra_data['ldap'], {'aa': 'bb'})
|
||||||
|
|
||||||
|
|
||||||
class ClipperViewsTests(CASViewTestCase):
|
class ClipperViewsTests(CASViewTestCase):
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,5 @@
|
||||||
|
from __future__ import unicode_literals
|
||||||
|
|
||||||
import re
|
import re
|
||||||
|
|
||||||
import django
|
import django
|
||||||
|
@ -6,6 +8,19 @@ from django.contrib.sites.models import Site
|
||||||
from django.core import mail
|
from django.core import mail
|
||||||
from django.test import TestCase, override_settings
|
from django.test import TestCase, override_settings
|
||||||
|
|
||||||
|
from allauth.socialaccount.models import SocialAccount
|
||||||
|
|
||||||
|
import six
|
||||||
|
from allauth_cas.test.testcases import CASTestCase
|
||||||
|
from fakeldap import MockLDAP
|
||||||
|
from mock import patch
|
||||||
|
|
||||||
|
from .adapter import deprecate_clippers, install_longterm_adapter
|
||||||
|
|
||||||
|
_mock_ldap = MockLDAP()
|
||||||
|
ldap_patcher = patch('allauth_ens.utils.ldap.initialize',
|
||||||
|
lambda x: _mock_ldap)
|
||||||
|
|
||||||
if django.VERSION >= (1, 10):
|
if django.VERSION >= (1, 10):
|
||||||
from django.urls import reverse
|
from django.urls import reverse
|
||||||
else:
|
else:
|
||||||
|
@ -135,3 +150,220 @@ class ViewsTests(TestCase):
|
||||||
def test_account_reset_password_from_key_done(self):
|
def test_account_reset_password_from_key_done(self):
|
||||||
r = self.client.get(reverse('account_reset_password_from_key_done'))
|
r = self.client.get(reverse('account_reset_password_from_key_done'))
|
||||||
self.assertEqual(r.status_code, 200)
|
self.assertEqual(r.status_code, 200)
|
||||||
|
|
||||||
|
|
||||||
|
@override_settings(
|
||||||
|
SOCIALACCOUNT_ADAPTER='allauth_ens.adapter.LongTermClipperAccountAdapter'
|
||||||
|
)
|
||||||
|
class LongTermClipperTests(CASTestCase):
|
||||||
|
def setUp(self):
|
||||||
|
ldap_patcher.start()
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
ldap_patcher.stop()
|
||||||
|
_mock_ldap.reset()
|
||||||
|
|
||||||
|
def _setup_ldap(self, promo=12, username="test"):
|
||||||
|
try:
|
||||||
|
buid = six.binary_type(username, 'utf-8')
|
||||||
|
home = six.binary_type('/users/%d/phy/test/' % promo, 'utf-8')
|
||||||
|
except TypeError:
|
||||||
|
buid = six.binary_type(username)
|
||||||
|
home = six.binary_type('/users/%d/phy/test/' % promo)
|
||||||
|
_mock_ldap.directory['dc=spi,dc=ens,dc=fr'] = {
|
||||||
|
'uid': [buid],
|
||||||
|
'cn': [b'John Smith'],
|
||||||
|
'mailRoutingAddress': [b'test@clipper.ens.fr'],
|
||||||
|
'homeDirectory': [home],
|
||||||
|
}
|
||||||
|
|
||||||
|
def _count_ldap_queries(self):
|
||||||
|
queries = _mock_ldap.ldap_methods_called()
|
||||||
|
count = len([op for op in queries if op != 'set_option'])
|
||||||
|
return count
|
||||||
|
|
||||||
|
def test_new_connexion(self):
|
||||||
|
self._setup_ldap()
|
||||||
|
|
||||||
|
r = self.client_cas_login(self.client, provider_id="clipper",
|
||||||
|
username="test")
|
||||||
|
u = r.context['user']
|
||||||
|
|
||||||
|
self.assertEqual(u.username, "test@12")
|
||||||
|
self.assertEqual(u.first_name, "John")
|
||||||
|
self.assertEqual(u.last_name, "Smith")
|
||||||
|
self.assertEqual(u.email, "test@clipper.ens.fr")
|
||||||
|
self.assertEqual(self._count_ldap_queries(), 1)
|
||||||
|
|
||||||
|
sa = list(SocialAccount.objects.all())[-1]
|
||||||
|
self.assertEqual(sa.user.id, u.id)
|
||||||
|
self.assertEqual(sa.extra_data['ldap']['entrance_year'], '12')
|
||||||
|
|
||||||
|
def test_connect_disconnect(self):
|
||||||
|
self._setup_ldap()
|
||||||
|
r0 = self.client_cas_login(self.client, provider_id="clipper",
|
||||||
|
username="test")
|
||||||
|
self.assertIn("_auth_user_id", self.client.session)
|
||||||
|
self.assertIn('user', r0.context)
|
||||||
|
|
||||||
|
self.client.logout()
|
||||||
|
self.assertNotIn("_auth_user_id", self.client.session)
|
||||||
|
|
||||||
|
def test_second_connexion(self):
|
||||||
|
self._setup_ldap()
|
||||||
|
|
||||||
|
self.client_cas_login(self.client, provider_id="clipper",
|
||||||
|
username="test")
|
||||||
|
self.client.logout()
|
||||||
|
|
||||||
|
nu = User.objects.count()
|
||||||
|
|
||||||
|
self.client_cas_login(self.client, provider_id="clipper",
|
||||||
|
username="test")
|
||||||
|
self.assertEqual(User.objects.count(), nu)
|
||||||
|
self.assertEqual(self._count_ldap_queries(), 1)
|
||||||
|
|
||||||
|
def test_deprecation(self):
|
||||||
|
self._setup_ldap()
|
||||||
|
self.client_cas_login(self.client, provider_id="clipper",
|
||||||
|
username="test")
|
||||||
|
deprecate_clippers()
|
||||||
|
|
||||||
|
sa = SocialAccount.objects.all()[0]
|
||||||
|
self.assertEqual(sa.provider, "clipper_inactive")
|
||||||
|
|
||||||
|
def test_reconnect_after_deprecation(self):
|
||||||
|
self._setup_ldap()
|
||||||
|
r = self.client_cas_login(self.client, provider_id="clipper",
|
||||||
|
username="test")
|
||||||
|
user0 = r.context['user']
|
||||||
|
n_sa0 = SocialAccount.objects.count()
|
||||||
|
n_u0 = User.objects.count()
|
||||||
|
self.client.logout()
|
||||||
|
|
||||||
|
deprecate_clippers()
|
||||||
|
|
||||||
|
r = self.client_cas_login(self.client, provider_id="clipper",
|
||||||
|
username="test")
|
||||||
|
user1 = r.context['user']
|
||||||
|
sa1 = list(SocialAccount.objects.all())
|
||||||
|
n_u1 = User.objects.count()
|
||||||
|
self.assertEqual(len(sa1), n_sa0)
|
||||||
|
self.assertEqual(n_u1, n_u0)
|
||||||
|
self.assertEqual(user1.id, user0.id)
|
||||||
|
self.assertEqual(self._count_ldap_queries(), 2)
|
||||||
|
|
||||||
|
def test_override_inactive_account(self):
|
||||||
|
self._setup_ldap(12)
|
||||||
|
r = self.client_cas_login(self.client, provider_id="clipper",
|
||||||
|
username="test")
|
||||||
|
user0 = r.context['user']
|
||||||
|
n_sa0 = SocialAccount.objects.count()
|
||||||
|
n_u0 = User.objects.count()
|
||||||
|
self.client.logout()
|
||||||
|
|
||||||
|
deprecate_clippers()
|
||||||
|
|
||||||
|
self._setup_ldap(13)
|
||||||
|
r = self.client_cas_login(self.client, provider_id="clipper",
|
||||||
|
username="test")
|
||||||
|
|
||||||
|
user1 = r.context['user']
|
||||||
|
sa1 = list(SocialAccount.objects.all())
|
||||||
|
n_u1 = User.objects.count()
|
||||||
|
self.assertEqual(len(sa1), n_sa0 + 1)
|
||||||
|
self.assertEqual(n_u1, n_u0 + 1)
|
||||||
|
self.assertNotEqual(user1.id, user0.id)
|
||||||
|
|
||||||
|
def test_multiple_deprecation(self):
|
||||||
|
self._setup_ldap(12)
|
||||||
|
self.client_cas_login(self.client, provider_id="clipper",
|
||||||
|
username="test")
|
||||||
|
self.client.logout()
|
||||||
|
|
||||||
|
self._setup_ldap(15, "truc")
|
||||||
|
self.client_cas_login(self.client, provider_id="clipper",
|
||||||
|
username="truc")
|
||||||
|
self.client.logout()
|
||||||
|
sa0 = SocialAccount.objects.count()
|
||||||
|
|
||||||
|
deprecate_clippers()
|
||||||
|
|
||||||
|
self._setup_ldap(13)
|
||||||
|
self.client_cas_login(self.client, provider_id="clipper",
|
||||||
|
username="test")
|
||||||
|
self.client.logout()
|
||||||
|
|
||||||
|
sa1 = SocialAccount.objects.count()
|
||||||
|
|
||||||
|
deprecate_clippers()
|
||||||
|
sa2 = SocialAccount.objects.count()
|
||||||
|
|
||||||
|
# Older "test" inactive SocialAccount gets erased by new one
|
||||||
|
# while "truc" remains
|
||||||
|
self.assertEqual(sa0, sa2)
|
||||||
|
self.assertEqual(sa1, sa0 + 1)
|
||||||
|
|
||||||
|
def test_longterm_installer_from_allauth(self):
|
||||||
|
self._setup_ldap(12)
|
||||||
|
with self.settings(
|
||||||
|
SOCIALACCOUNT_ADAPTER='allauth.socialaccount.'
|
||||||
|
'adapter.DefaultSocialAccountAdapter'):
|
||||||
|
r = self.client_cas_login(self.client, provider_id="clipper",
|
||||||
|
username='test')
|
||||||
|
user0 = r.context["user"]
|
||||||
|
nsa0 = SocialAccount.objects.count()
|
||||||
|
self.assertEqual(user0.username, "test")
|
||||||
|
self.client.logout()
|
||||||
|
|
||||||
|
outputs = install_longterm_adapter()
|
||||||
|
|
||||||
|
self.assertEqual(outputs["updated"], [("test", "test@12")])
|
||||||
|
r = self.client_cas_login(self.client, provider_id="clipper",
|
||||||
|
username='test')
|
||||||
|
user1 = r.context["user"]
|
||||||
|
nsa1 = SocialAccount.objects.count()
|
||||||
|
conn = user1.socialaccount_set.get(provider='clipper')
|
||||||
|
self.assertEqual(user1.id, user0.id)
|
||||||
|
self.assertEqual(nsa1, nsa0)
|
||||||
|
self.assertEqual(user1.username, "test@12")
|
||||||
|
self.assertEqual(conn.extra_data['ldap']['entrance_year'], '12')
|
||||||
|
|
||||||
|
def test_longterm_installer_from_djangocas(self):
|
||||||
|
with self.settings(
|
||||||
|
SOCIALACCOUNT_ADAPTER='allauth.socialaccount.'
|
||||||
|
'adapter.DefaultSocialAccountAdapter'):
|
||||||
|
user0 = User.objects.create_user('test', 'test@clipper.ens.fr',
|
||||||
|
'test')
|
||||||
|
nsa0 = SocialAccount.objects.count()
|
||||||
|
|
||||||
|
self._setup_ldap(12)
|
||||||
|
|
||||||
|
outputs = install_longterm_adapter()
|
||||||
|
|
||||||
|
self.assertEqual(outputs["created"], [("test", "test@12")])
|
||||||
|
r = self.client_cas_login(self.client, provider_id="clipper",
|
||||||
|
username='test')
|
||||||
|
user1 = r.context["user"]
|
||||||
|
nsa1 = SocialAccount.objects.count()
|
||||||
|
conn = user1.socialaccount_set.get(provider='clipper')
|
||||||
|
self.assertEqual(user1.id, user0.id)
|
||||||
|
self.assertEqual(nsa1, nsa0 + 1)
|
||||||
|
self.assertEqual(user1.username, "test@12")
|
||||||
|
self.assertEqual(conn.extra_data['ldap']['entrance_year'], '12')
|
||||||
|
|
||||||
|
def test_disconnect_ldap(self):
|
||||||
|
nu0 = User.objects.count()
|
||||||
|
nsa0 = SocialAccount.objects.count()
|
||||||
|
|
||||||
|
ldap_patcher.stop()
|
||||||
|
with self.settings(CLIPPER_LDAP_SERVER=''):
|
||||||
|
self.assertRaises(ValueError, self.client_cas_login,
|
||||||
|
self.client, provider_id="clipper",
|
||||||
|
username="test")
|
||||||
|
|
||||||
|
nu1 = User.objects.count()
|
||||||
|
nsa1 = SocialAccount.objects.count()
|
||||||
|
self.assertEqual(nu0, nu1)
|
||||||
|
self.assertEqual(nsa0, nsa1)
|
||||||
|
ldap_patcher.start()
|
||||||
|
|
126
allauth_ens/utils.py
Normal file
126
allauth_ens/utils.py
Normal file
|
@ -0,0 +1,126 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
from django.conf import settings
|
||||||
|
|
||||||
|
from allauth.account.models import EmailAddress
|
||||||
|
from allauth.account.utils import user_email
|
||||||
|
|
||||||
|
import ldap
|
||||||
|
|
||||||
|
DEPARTMENTS_LIST = {
|
||||||
|
'phy': u'Physique',
|
||||||
|
'maths': u'Maths',
|
||||||
|
'bio': u'Biologie',
|
||||||
|
'chimie': u'Chimie',
|
||||||
|
'geol': u'Géosciences',
|
||||||
|
'dec': u'DEC',
|
||||||
|
'info': u'Informatique',
|
||||||
|
'litt': u'Littéraire',
|
||||||
|
'guests': u'Pensionnaires étrangers',
|
||||||
|
'pei': u'PEI',
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def init_ldap():
|
||||||
|
server = getattr(settings, "CLIPPER_LDAP_SERVER",
|
||||||
|
"ldaps://ldap.spi.ens.fr:636")
|
||||||
|
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,
|
||||||
|
ldap.OPT_X_TLS_NEVER)
|
||||||
|
ldap_connection = ldap.initialize(server)
|
||||||
|
ldap_connection.set_option(ldap.OPT_REFERRALS, 0)
|
||||||
|
ldap_connection.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
|
||||||
|
ldap_connection.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
|
||||||
|
ldap_connection.set_option(ldap.OPT_X_TLS_DEMAND, True)
|
||||||
|
ldap_connection.set_option(ldap.OPT_DEBUG_LEVEL, 255)
|
||||||
|
ldap_connection.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
|
||||||
|
ldap_connection.set_option(ldap.OPT_TIMEOUT, 10)
|
||||||
|
return ldap_connection
|
||||||
|
|
||||||
|
|
||||||
|
def extract_infos_from_ldap(infos):
|
||||||
|
data = {}
|
||||||
|
|
||||||
|
# Name
|
||||||
|
if 'cn' in infos:
|
||||||
|
data['name'] = infos['cn'][0].decode("utf-8")
|
||||||
|
|
||||||
|
# Parsing homeDirectory to get entrance year and departments
|
||||||
|
if 'homeDirectory' in infos:
|
||||||
|
dirs = infos['homeDirectory'][0].decode("utf-8").split('/')
|
||||||
|
if len(dirs) >= 4 and dirs[1] == 'users':
|
||||||
|
# Assume template "/users/<year>/<department>/clipper/"
|
||||||
|
annee = dirs[2]
|
||||||
|
dep = dirs[3]
|
||||||
|
dep_fancy = DEPARTMENTS_LIST.get(dep.lower(), '')
|
||||||
|
data['entrance_year'] = annee
|
||||||
|
data['department_code'] = dep
|
||||||
|
data['department'] = dep_fancy
|
||||||
|
|
||||||
|
# Mail
|
||||||
|
pmail = infos.get('mailRoutingAddress', [])
|
||||||
|
if pmail:
|
||||||
|
data['email'] = pmail[0].decode("utf-8")
|
||||||
|
|
||||||
|
# User id
|
||||||
|
if 'uid' in infos:
|
||||||
|
data['clipper_uid'] = infos['uid'][0].decode("utf-8").strip().lower()
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def get_ldap_infos(clipper_uid):
|
||||||
|
assert clipper_uid.isalnum()
|
||||||
|
data = {}
|
||||||
|
try:
|
||||||
|
ldap_connection = init_ldap()
|
||||||
|
|
||||||
|
info = ldap_connection.search_s('dc=spi,dc=ens,dc=fr',
|
||||||
|
ldap.SCOPE_SUBTREE,
|
||||||
|
('(uid=%s)' % (clipper_uid,)),
|
||||||
|
['cn',
|
||||||
|
'mailRoutingAddress',
|
||||||
|
'homeDirectory'])
|
||||||
|
|
||||||
|
if len(info) > 0:
|
||||||
|
data = extract_infos_from_ldap(info[0][1])
|
||||||
|
|
||||||
|
except ldap.LDAPError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
def get_clipper_email(clipper):
|
||||||
|
return '{}@clipper.ens.fr'.format(clipper.strip().lower())
|
||||||
|
|
||||||
|
|
||||||
|
def remove_email(user, email):
|
||||||
|
"""
|
||||||
|
Removes an email address of a user.
|
||||||
|
|
||||||
|
If it is his primary email address, it sets another email address as
|
||||||
|
primary, preferably verified.
|
||||||
|
"""
|
||||||
|
u_mailaddrs = user.emailaddress_set.filter(user=user)
|
||||||
|
try:
|
||||||
|
mailaddr = user.emailaddress_set.get(email=email)
|
||||||
|
except EmailAddress.DoesNotExist:
|
||||||
|
return
|
||||||
|
|
||||||
|
if mailaddr.primary:
|
||||||
|
others = u_mailaddrs.filter(primary=False)
|
||||||
|
|
||||||
|
# Prefer a verified mail.
|
||||||
|
new_primary = (
|
||||||
|
others.filter(verified=True).last() or
|
||||||
|
others.last()
|
||||||
|
)
|
||||||
|
|
||||||
|
if new_primary:
|
||||||
|
# It also updates 'user.EMAIL_FIELD'.
|
||||||
|
new_primary.set_as_primary()
|
||||||
|
else:
|
||||||
|
user_email(user, '')
|
||||||
|
user.save()
|
||||||
|
|
||||||
|
mailaddr.delete()
|
|
@ -1,11 +1,17 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
from allauth.account.adapter import DefaultAccountAdapter
|
from allauth.account.adapter import DefaultAccountAdapter
|
||||||
|
from allauth_ens.adapter import LongTermClipperAccountAdapter
|
||||||
from allauth.socialaccount.adapter import DefaultSocialAccountAdapter
|
from allauth.socialaccount.adapter import DefaultSocialAccountAdapter
|
||||||
|
|
||||||
|
|
||||||
class AccountAdapter(DefaultAccountAdapter):
|
class AccountAdapter(DefaultAccountAdapter):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class SocialAccountAdapter(DefaultSocialAccountAdapter):
|
class SocialAccountAdapter(LongTermClipperAccountAdapter):
|
||||||
pass
|
|
||||||
|
def get_username(self, clipper, data):
|
||||||
|
"""
|
||||||
|
Exception-free version of get_username, so that it works even outside of
|
||||||
|
the ENS (if no access to LDAP server)
|
||||||
|
"""
|
||||||
|
return "{}@{}".format(clipper, data.get('entrance_year', '00'))
|
||||||
|
|
|
@ -7,7 +7,6 @@ combine_as_imports = True
|
||||||
default_section = THIRDPARTY
|
default_section = THIRDPARTY
|
||||||
include_trailing_comma = True
|
include_trailing_comma = True
|
||||||
known_allauth = allauth
|
known_allauth = allauth
|
||||||
known_future_library = future,six
|
|
||||||
known_django = django
|
known_django = django
|
||||||
known_first_party = allauth_ens
|
known_first_party = allauth_ens
|
||||||
multi_line_output = 5
|
multi_line_output = 5
|
||||||
|
|
3
tox.ini
3
tox.ini
|
@ -15,7 +15,8 @@ deps =
|
||||||
django111: django>=1.11,<2.0
|
django111: django>=1.11,<2.0
|
||||||
django20: django>=2.0,<2.1
|
django20: django>=2.0,<2.1
|
||||||
coverage
|
coverage
|
||||||
mock ; python_version < "3.0"
|
fakeldap
|
||||||
|
mock
|
||||||
usedevelop= True
|
usedevelop= True
|
||||||
commands =
|
commands =
|
||||||
python -V
|
python -V
|
||||||
|
|
Loading…
Reference in a new issue