Detoxify + debug py34

This commit is contained in:
Evarin 2018-09-29 23:32:50 +02:00
parent 35a3bc3e2d
commit b189c11454
3 changed files with 63 additions and 50 deletions

View file

@ -10,10 +10,14 @@ from allauth.socialaccount.models import SocialAccount
import ldap import ldap
from .utils import extract_infos_from_ldap, get_ldap_infos, get_clipper_email, remove_email, init_ldap from .utils import (
extract_infos_from_ldap, get_clipper_email, get_ldap_infos, init_ldap,
remove_email,
)
User = get_user_model() User = get_user_model()
class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter): class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter):
""" """
A class to manage the fact that people loose their account at the end of A class to manage the fact that people loose their account at the end of
@ -25,8 +29,8 @@ class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter):
If a clipper connection has already existed with the uid, it checks If a clipper connection has already existed with the uid, it checks
that this connection still belongs to the user it was associated with. that this connection still belongs to the user it was associated with.
This check is performed by comparing the generated username corresponding This check is performed by comparing the generated username
to this connection with the old one. corresponding to this connection with the old one.
If the check succeeds, it simply reactivates the clipper connection as If the check succeeds, it simply reactivates the clipper connection as
belonging to the associated user. belonging to the associated user.
@ -59,7 +63,7 @@ class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter):
# if a new SocialAccount is created # if a new SocialAccount is created
email = ldap_data.get('email', get_clipper_email(clipper_uid)) email = ldap_data.get('email', get_clipper_email(clipper_uid))
remove_email(old_conn.user, email) remove_email(old_conn.user, email)
return return
# The admission year is the same, we can update the model and keep # The admission year is the same, we can update the model and keep
@ -87,9 +91,9 @@ class LongTermClipperAccountAdapter(DefaultSocialAccountAdapter):
user.set_unusable_password() user.set_unusable_password()
clipper_uid = sociallogin.account.uid clipper_uid = sociallogin.account.uid
ldap_data = sociallogin._ldap_data if hasattr(sociallogin, ldap_data = sociallogin._ldap_data if \
'_ldap_data') \ hasattr(sociallogin, '_ldap_data') \
else get_ldap_infos(clipper_uid) else get_ldap_infos(clipper_uid)
username = self.get_username(clipper_uid, ldap_data) username = self.get_username(clipper_uid, ldap_data)
email = ldap_data.get('email', get_clipper_email(clipper_uid)) email = ldap_data.get('email', get_clipper_email(clipper_uid))
@ -147,17 +151,18 @@ def install_longterm_adapter(fake=False):
accounts = {u.username: u for u in User.objects.all() accounts = {u.username: u for u in User.objects.all()
if u.username.isalnum()} if u.username.isalnum()}
l = init_ldap() ldap_connection = init_ldap()
ltc_adapter = get_adapter() ltc_adapter = get_adapter()
info = l.search_s('dc=spi,dc=ens,dc=fr', info = ldap_connection.search_s(
ldap.SCOPE_SUBTREE, 'dc=spi,dc=ens,dc=fr',
("(|{})".format(''.join(("(uid=%s)" % (un,)) ldap.SCOPE_SUBTREE,
for un in accounts.keys()))), ("(|{})".format(''.join(("(uid=%s)" % (un,))
['uid', for un in accounts.keys()))),
'cn', ['uid',
'mailRoutingAddress', 'cn',
'homeDirectory']) 'mailRoutingAddress',
'homeDirectory'])
logs = {"created": [], "updated": []} logs = {"created": [], "updated": []}
cases = [] cases = []

View file

@ -165,12 +165,12 @@ class LongTermClipperTests(CASTestCase):
'uid': [username], 'uid': [username],
'cn': [b'John Smith'], 'cn': [b'John Smith'],
'mailRoutingAddress': [b'test@clipper.ens.fr'], 'mailRoutingAddress': [b'test@clipper.ens.fr'],
'homeDirectory': [b'/users/%d/phy/test/' % promo], 'homeDirectory': [bytes('/users/%d/phy/test/' % promo)],
} }
def _count_ldap_queries(self): def _count_ldap_queries(self):
queries = _mock_ldap.ldap_methods_called() queries = _mock_ldap.ldap_methods_called()
count = len([l for l in queries if l != 'set_option']) count = len([op for op in queries if op != 'set_option'])
return count return count
def test_new_connexion(self): def test_new_connexion(self):
@ -267,13 +267,13 @@ class LongTermClipperTests(CASTestCase):
def test_multiple_deprecation(self): def test_multiple_deprecation(self):
self._setup_ldap(12) self._setup_ldap(12)
r = self.client_cas_login(self.client, provider_id="clipper", self.client_cas_login(self.client, provider_id="clipper",
username="test") username="test")
self.client.logout() self.client.logout()
self._setup_ldap(15, "truc") self._setup_ldap(15, "truc")
r = self.client_cas_login(self.client, provider_id="clipper", self.client_cas_login(self.client, provider_id="clipper",
username="truc") username="truc")
self.client.logout() self.client.logout()
sa0 = SocialAccount.objects.count() sa0 = SocialAccount.objects.count()
@ -296,8 +296,9 @@ class LongTermClipperTests(CASTestCase):
def test_longterm_installer_from_allauth(self): def test_longterm_installer_from_allauth(self):
self._setup_ldap(12) self._setup_ldap(12)
with self.settings(SOCIALACCOUNT_ADAPTER= with self.settings(
'allauth.socialaccount.adapter.DefaultSocialAccountAdapter'): SOCIALACCOUNT_ADAPTER='allauth.socialaccount.'
'adapter.DefaultSocialAccountAdapter'):
r = self.client_cas_login(self.client, provider_id="clipper", r = self.client_cas_login(self.client, provider_id="clipper",
username='test') username='test')
user0 = r.context["user"] user0 = r.context["user"]
@ -305,9 +306,9 @@ class LongTermClipperTests(CASTestCase):
self.assertEqual(user0.username, "test") self.assertEqual(user0.username, "test")
self.client.logout() self.client.logout()
l = install_longterm_adapter() outputs = install_longterm_adapter()
self.assertEqual(l["updated"], [("test", "test@12")]) self.assertEqual(outputs["updated"], [("test", "test@12")])
r = self.client_cas_login(self.client, provider_id="clipper", r = self.client_cas_login(self.client, provider_id="clipper",
username='test') username='test')
user1 = r.context["user"] user1 = r.context["user"]
@ -317,17 +318,18 @@ class LongTermClipperTests(CASTestCase):
self.assertEqual(user1.username, "test@12") self.assertEqual(user1.username, "test@12")
def test_longterm_installer_from_djangocas(self): def test_longterm_installer_from_djangocas(self):
with self.settings(SOCIALACCOUNT_ADAPTER= with self.settings(
'allauth.socialaccount.adapter.DefaultSocialAccountAdapter'): SOCIALACCOUNT_ADAPTER='allauth.socialaccount.'
'adapter.DefaultSocialAccountAdapter'):
user0 = User.objects.create_user('test', 'test@clipper.ens.fr', user0 = User.objects.create_user('test', 'test@clipper.ens.fr',
'test') 'test')
nsa0 = SocialAccount.objects.count() nsa0 = SocialAccount.objects.count()
self._setup_ldap(12) self._setup_ldap(12)
l = install_longterm_adapter() outputs = install_longterm_adapter()
self.assertEqual(l["created"], [("test", "test@12")]) self.assertEqual(outputs["created"], [("test", "test@12")])
r = self.client_cas_login(self.client, provider_id="clipper", r = self.client_cas_login(self.client, provider_id="clipper",
username='test') username='test')
user1 = r.context["user"] user1 = r.context["user"]

View file

@ -20,23 +20,26 @@ DEPARTMENTS_LIST = {
'pei': u'PEI', 'pei': u'PEI',
} }
def init_ldap(): def init_ldap():
server = getattr(settings, "CLIPPER_LDAP_SERVER", "ldaps://ldap.spi.ens.fr:636") server = getattr(settings, "CLIPPER_LDAP_SERVER",
"ldaps://ldap.spi.ens.fr:636")
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT,
ldap.OPT_X_TLS_NEVER) ldap.OPT_X_TLS_NEVER)
l = ldap.initialize(server) ldap_connection = ldap.initialize(server)
l.set_option(ldap.OPT_REFERRALS, 0) ldap_connection.set_option(ldap.OPT_REFERRALS, 0)
l.set_option(ldap.OPT_PROTOCOL_VERSION, 3) ldap_connection.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
l.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND) ldap_connection.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
l.set_option(ldap.OPT_X_TLS_DEMAND, True) ldap_connection.set_option(ldap.OPT_X_TLS_DEMAND, True)
l.set_option(ldap.OPT_DEBUG_LEVEL, 255) ldap_connection.set_option(ldap.OPT_DEBUG_LEVEL, 255)
l.set_option(ldap.OPT_NETWORK_TIMEOUT, 10) ldap_connection.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
l.set_option(ldap.OPT_TIMEOUT, 10) ldap_connection.set_option(ldap.OPT_TIMEOUT, 10)
return l return ldap_connection
def extract_infos_from_ldap(infos): def extract_infos_from_ldap(infos):
data = {} data = {}
# Name # Name
if 'cn' in infos: if 'cn' in infos:
data['name'] = infos['cn'][0].decode("utf-8") data['name'] = infos['cn'][0].decode("utf-8")
@ -57,25 +60,26 @@ def extract_infos_from_ldap(infos):
pmail = infos.get('mailRoutingAddress', []) pmail = infos.get('mailRoutingAddress', [])
if pmail: if pmail:
data['email'] = pmail[0].decode("utf-8") data['email'] = pmail[0].decode("utf-8")
# User id # User id
if 'uid' in infos: if 'uid' in infos:
data['clipper_uid'] = infos['uid'][0].decode("utf-8").strip().lower() data['clipper_uid'] = infos['uid'][0].decode("utf-8").strip().lower()
return data return data
def get_ldap_infos(clipper_uid): def get_ldap_infos(clipper_uid):
assert clipper_uid.isalnum() assert clipper_uid.isalnum()
data = {} data = {}
try: try:
l = init_ldap() ldap_connection = init_ldap()
info = l.search_s('dc=spi,dc=ens,dc=fr', info = ldap_connection.search_s('dc=spi,dc=ens,dc=fr',
ldap.SCOPE_SUBTREE, ldap.SCOPE_SUBTREE,
('(uid=%s)' % (clipper_uid,)), ('(uid=%s)' % (clipper_uid,)),
['cn', ['cn',
'mailRoutingAddress', 'mailRoutingAddress',
'homeDirectory']) 'homeDirectory'])
if len(info) > 0: if len(info) > 0:
data = extract_infos_from_ldap(info[0][1]) data = extract_infos_from_ldap(info[0][1])
@ -85,9 +89,11 @@ def get_ldap_infos(clipper_uid):
return data return data
def get_clipper_email(clipper): def get_clipper_email(clipper):
return '{}@clipper.ens.fr'.format(clipper.strip().lower()) return '{}@clipper.ens.fr'.format(clipper.strip().lower())
def remove_email(user, email): def remove_email(user, email):
""" """
Removes an email address of a user. Removes an email address of a user.