diff --git a/fiches/management/commands/_ldap.py b/fiches/management/commands/_ldap.py new file mode 100644 index 0000000..34a01ca --- /dev/null +++ b/fiches/management/commands/_ldap.py @@ -0,0 +1,144 @@ +from collections import namedtuple + +import ldap +import ldap.filter +from django.conf import settings + +ClipperAccount = namedtuple("ClipperAccount", ["uid", "name", "email", "year", "dept"]) + + +class LDAP: + """Classe pour se connecter à un LDAP ENS (CRI ou SPI). + Inspirée de `merle_scripts`. + """ + + # À renseigner + LDAP_SERVER = { + "PROTOCOL": "ldaps", + "URL": "ldap.example.com", + "PORT": 636, + } + + search_base = "" + attr_list = [] + + def __init__(self): + """ Initialize the LDAP object """ + + ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER) + self.ldap_obj = ldap.initialize( + "{PROTOCOL}://{URL}:{PORT}".format(**self.LDAP_SERVER) + ) + self.ldap_obj.set_option(ldap.OPT_REFERRALS, 0) + self.ldap_obj.set_option(ldap.OPT_PROTOCOL_VERSION, 3) + self.ldap_obj.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND) + self.ldap_obj.set_option(ldap.OPT_X_TLS_DEMAND, True) + self.ldap_obj.set_option(ldap.OPT_NETWORK_TIMEOUT, 10) + self.ldap_obj.set_option(ldap.OPT_TIMEOUT, 10) + + def search(self, filters="(objectClass=*)"): + """Do a ldap.search_s with the given filters, as specified for + ldap.search_s""" + + return self.ldap_obj.search_s( + self.search_base, ldap.SCOPE_SUBTREE, filters, self.attr_list + ) + + @staticmethod + def extract_ldap_info(entry, field): + """ Extract the given field from an LDAP entry as an UTF-8 string """ + return entry[1].get(field, [b""])[0].decode("utf-8") + + +class ClipperLDAP(LDAP): + + LDAP_SERVER = settings.LDAP["SPI"] + + search_base = "dc=spi,dc=ens,dc=fr" + attr_list = ["cn", "uid", "mail", "homeDirectory"] + + verbose_depts = { + "bio": "Biologie", + "chimie": "Chimie", + "dec": "Études Cognitives", + "geol": "Géosciences", + "guests": "Invité·e", + "info": "Informatique", + "litt": "Lettres", + "maths": "Mathématiques", + "pei": "PEI", + "phy": "Physique", + } + + def parse_dept(self, home_dir): + """Extrait le département d'entrée d'un·e élève à partir de son dossier. + Le dossier a le format `/users///`. + """ + users, promo, dept, login = home_dir.split("/")[1:] + + if users != "users": + raise ValueError("Invalid home directory") + + # Ça casse en 2100, mais le système de naming de sas aussi... + promo = "20" + promo + + return promo, self.verbose_depts.get(dept, None) + + def get_clipper_list(self, promo_filter=None): + """Extrait la liste des comptes clipper présents dans le LDAP. + Renvoie une liste de `namedTuple` contenant leur nom, prénom, adresse mail et + département/année d'entrée. + Si `promo_filter != None`, ne renvoie que les clippers d'une promotion donnée. + """ + search_res = self.search() + clipper_list = [] + + for entry in search_res: + uid = self.extract_ldap_info(entry, "uid") + # Il y a des comptes "bizarre" (e.g. `root`) avec uid vide + if len(uid) > 0: + try: + name = self.extract_ldap_info(entry, "cn") + email = self.extract_ldap_info(entry, "mail") + promo, dept = self.parse_dept( + self.extract_ldap_info(entry, "homeDirectory") + ) + + if promo_filter is None or promo == promo_filter: + clipper_list.append( + ClipperAccount(uid, name, email, promo, dept) + ) + except ValueError: + pass + + return clipper_list + + +class AnnuaireLDAP(LDAP): + + LDAP_SERVER = settings.LDAP["CRI"] + + search_base = "ou=people,dc=ens,dc=fr" + attr_list = ["uid", "sn", "givenName", "ou"] + + def try_match(self, clipper): + """Essaie de trouver une entrée correspondant au compte clipper donné dans + l'annuaire de l'ENS. L'heuristique est la suivante : il est très probable + que le prénom de la personne commence par le premier mot de `clipper.fullname`, + et que son nom de famille finisse par le dernier mot. + """ + given_name = clipper.name.split(" ")[0] + last_name = clipper.name.split(" ")[-1] + + search_name = self.search( + "(&(givenName={}*)(sn=*{}))".format(given_name, last_name) + ) + + if len(search_name) > 0: + if len(search_name) > 2: + print("Erreur : deux résultats trouvés pour {}".format(clipper.uid)) + return None + + return self.extract_ldap_info(search_name[0], "uid") + + return None diff --git a/fiches/management/commands/add_conscrits.py b/fiches/management/commands/add_conscrits.py new file mode 100644 index 0000000..d668e93 --- /dev/null +++ b/fiches/management/commands/add_conscrits.py @@ -0,0 +1,109 @@ +from datetime import date +from io import BytesIO +from urllib.error import HTTPError +from urllib.request import urlopen + +from django.conf import settings +from django.contrib.auth.models import User +from django.core.files import File +from django.core.management.base import BaseCommand + +from fiches.models import Department, Profile + +from ._ldap import AnnuaireLDAP, ClipperLDAP + + +class Command(BaseCommand): + help = ( + "Importe les noms et (si possible) les photos des conscrit·e·s dans l'annuaire." + ) + + def add_arguments(self, parser): + group = parser.add_mutually_exclusive_group() + group.add_argument( + "--all", action="store_true", help="Importe l'intégralité des promotions" + ) + group.add_argument("--promo", help="Spécifie la promotion à importer") + + parser.add_argument( + "--without-photos", + action="store_true", + help="N'essaie pas d'importer les photos associées", + ) + + def get_current_promo(self): + today = date.today() + year = today.year + if today.month < 9: + year -= 1 + + return str(year) + + def handle(self, *args, **options): + if options["all"]: + promo = None + elif options["promo"] is not None: + promo = options["promo"] + if len(promo) == 2: + promo = "20" + promo + else: + promo = self.get_current_promo() + + # On récupère la liste des élèves à créer + ldap = ClipperLDAP() + clipper_list = ldap.get_clipper_list(promo_filter=promo) + + # On vire les élèves déjà existants + existing_users = set(User.objects.values_list("username", flat=True)) + clippers = [ + clipper for clipper in clipper_list if clipper.uid not in existing_users + ] + + # Les départements sont créés à la main ; il y en a peu. + depts = { + dept: Department.objects.get_or_create(name=dept)[0].id + for dept in ldap.verbose_depts.values() + } + + users_to_create = [] + profiles_to_create = [] + dept_m2m_to_create = [] + + for clipper in clippers: + user = User(username=clipper.uid, email=clipper.email) + profile = Profile(user=user, full_name=clipper.name, promotion=clipper.year) + users_to_create.append(user) + profiles_to_create.append(profile) + dept_m2m_to_create.append( + Profile.department.through( + profile=profile, department_id=depts[clipper.dept] + ) + ) + + User.objects.bulk_create(users_to_create) + for profile in profiles_to_create: + profile.user_id = profile.user.id + profiles = Profile.objects.bulk_create(profiles_to_create) + for dept_m2m in dept_m2m_to_create: + dept_m2m.profile_id = dept_m2m.profile.id + Profile.department.through.objects.bulk_create(dept_m2m_to_create) + + # Gestion des images + + if not options["without_photos"]: + cri_ldap = AnnuaireLDAP() + base_annuaire = "{PROTOCOL}://{URL}:{PORT}".format(**settings.ANNUAIRE) + + for clipper, profile in zip(clippers, profiles): + cri_login = cri_ldap.try_match(clipper) + if cri_login is not None: + img_url = "{}/photos/{}.jpg".format(base_annuaire, cri_login) + try: + istream = urlopen(img_url) + profile.picture.save( + name="{}.jpg".format(profile.user.username), + content=File(BytesIO(istream.read())), + ) + except HTTPError: + # Parfois il y a une erreur 404 : dans ce cas, pas de photo + pass