Commande pour ajouter les conscrit·e·s

This commit is contained in:
Ludovic Stephan 2020-11-13 11:45:32 +01:00
parent a2c06a6de0
commit 5d25fc87f1
2 changed files with 253 additions and 0 deletions

View file

@ -0,0 +1,144 @@
from collections import namedtuple
import ldap
import ldap.filter
from django.conf import settings
ClipperAccount = namedtuple("ClipperAccount", ["uid", "name", "email", "year", "dept"])
class LDAP:
"""Classe pour se connecter à un LDAP ENS (CRI ou SPI).
Inspirée de `merle_scripts`.
"""
# À renseigner
LDAP_SERVER = {
"PROTOCOL": "ldaps",
"URL": "ldap.example.com",
"PORT": 636,
}
search_base = ""
attr_list = []
def __init__(self):
""" Initialize the LDAP object """
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
self.ldap_obj = ldap.initialize(
"{PROTOCOL}://{URL}:{PORT}".format(**self.LDAP_SERVER)
)
self.ldap_obj.set_option(ldap.OPT_REFERRALS, 0)
self.ldap_obj.set_option(ldap.OPT_PROTOCOL_VERSION, 3)
self.ldap_obj.set_option(ldap.OPT_X_TLS, ldap.OPT_X_TLS_DEMAND)
self.ldap_obj.set_option(ldap.OPT_X_TLS_DEMAND, True)
self.ldap_obj.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
self.ldap_obj.set_option(ldap.OPT_TIMEOUT, 10)
def search(self, filters="(objectClass=*)"):
"""Do a ldap.search_s with the given filters, as specified for
ldap.search_s"""
return self.ldap_obj.search_s(
self.search_base, ldap.SCOPE_SUBTREE, filters, self.attr_list
)
@staticmethod
def extract_ldap_info(entry, field):
""" Extract the given field from an LDAP entry as an UTF-8 string """
return entry[1].get(field, [b""])[0].decode("utf-8")
class ClipperLDAP(LDAP):
LDAP_SERVER = settings.LDAP["SPI"]
search_base = "dc=spi,dc=ens,dc=fr"
attr_list = ["cn", "uid", "mail", "homeDirectory"]
verbose_depts = {
"bio": "Biologie",
"chimie": "Chimie",
"dec": "Études Cognitives",
"geol": "Géosciences",
"guests": "Invité·e",
"info": "Informatique",
"litt": "Lettres",
"maths": "Mathématiques",
"pei": "PEI",
"phy": "Physique",
}
def parse_dept(self, home_dir):
"""Extrait le département d'entrée d'un·e élève à partir de son dossier.
Le dossier a le format `/users/<promo>/<dept>/<login>`.
"""
users, promo, dept, login = home_dir.split("/")[1:]
if users != "users":
raise ValueError("Invalid home directory")
# Ça casse en 2100, mais le système de naming de sas aussi...
promo = "20" + promo
return promo, self.verbose_depts.get(dept, None)
def get_clipper_list(self, promo_filter=None):
"""Extrait la liste des comptes clipper présents dans le LDAP.
Renvoie une liste de `namedTuple` contenant leur nom, prénom, adresse mail et
département/année d'entrée.
Si `promo_filter != None`, ne renvoie que les clippers d'une promotion donnée.
"""
search_res = self.search()
clipper_list = []
for entry in search_res:
uid = self.extract_ldap_info(entry, "uid")
# Il y a des comptes "bizarre" (e.g. `root`) avec uid vide
if len(uid) > 0:
try:
name = self.extract_ldap_info(entry, "cn")
email = self.extract_ldap_info(entry, "mail")
promo, dept = self.parse_dept(
self.extract_ldap_info(entry, "homeDirectory")
)
if promo_filter is None or promo == promo_filter:
clipper_list.append(
ClipperAccount(uid, name, email, promo, dept)
)
except ValueError:
pass
return clipper_list
class AnnuaireLDAP(LDAP):
LDAP_SERVER = settings.LDAP["CRI"]
search_base = "ou=people,dc=ens,dc=fr"
attr_list = ["uid", "sn", "givenName", "ou"]
def try_match(self, clipper):
"""Essaie de trouver une entrée correspondant au compte clipper donné dans
l'annuaire de l'ENS. L'heuristique est la suivante : il est très probable
que le prénom de la personne commence par le premier mot de `clipper.fullname`,
et que son nom de famille finisse par le dernier mot.
"""
given_name = clipper.name.split(" ")[0]
last_name = clipper.name.split(" ")[-1]
search_name = self.search(
"(&(givenName={}*)(sn=*{}))".format(given_name, last_name)
)
if len(search_name) > 0:
if len(search_name) > 2:
print("Erreur : deux résultats trouvés pour {}".format(clipper.uid))
return None
return self.extract_ldap_info(search_name[0], "uid")
return None

View file

@ -0,0 +1,109 @@
from datetime import date
from io import BytesIO
from urllib.error import HTTPError
from urllib.request import urlopen
from django.conf import settings
from django.contrib.auth.models import User
from django.core.files import File
from django.core.management.base import BaseCommand
from fiches.models import Department, Profile
from ._ldap import AnnuaireLDAP, ClipperLDAP
class Command(BaseCommand):
help = (
"Importe les noms et (si possible) les photos des conscrit·e·s dans l'annuaire."
)
def add_arguments(self, parser):
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--all", action="store_true", help="Importe l'intégralité des promotions"
)
group.add_argument("--promo", help="Spécifie la promotion à importer")
parser.add_argument(
"--without-photos",
action="store_true",
help="N'essaie pas d'importer les photos associées",
)
def get_current_promo(self):
today = date.today()
year = today.year
if today.month < 9:
year -= 1
return str(year)
def handle(self, *args, **options):
if options["all"]:
promo = None
elif options["promo"] is not None:
promo = options["promo"]
if len(promo) == 2:
promo = "20" + promo
else:
promo = self.get_current_promo()
# On récupère la liste des élèves à créer
ldap = ClipperLDAP()
clipper_list = ldap.get_clipper_list(promo_filter=promo)
# On vire les élèves déjà existants
existing_users = set(User.objects.values_list("username", flat=True))
clippers = [
clipper for clipper in clipper_list if clipper.uid not in existing_users
]
# Les départements sont créés à la main ; il y en a peu.
depts = {
dept: Department.objects.get_or_create(name=dept)[0].id
for dept in ldap.verbose_depts.values()
}
users_to_create = []
profiles_to_create = []
dept_m2m_to_create = []
for clipper in clippers:
user = User(username=clipper.uid, email=clipper.email)
profile = Profile(user=user, full_name=clipper.name, promotion=clipper.year)
users_to_create.append(user)
profiles_to_create.append(profile)
dept_m2m_to_create.append(
Profile.department.through(
profile=profile, department_id=depts[clipper.dept]
)
)
User.objects.bulk_create(users_to_create)
for profile in profiles_to_create:
profile.user_id = profile.user.id
profiles = Profile.objects.bulk_create(profiles_to_create)
for dept_m2m in dept_m2m_to_create:
dept_m2m.profile_id = dept_m2m.profile.id
Profile.department.through.objects.bulk_create(dept_m2m_to_create)
# Gestion des images
if not options["without_photos"]:
cri_ldap = AnnuaireLDAP()
base_annuaire = "{PROTOCOL}://{URL}:{PORT}".format(**settings.ANNUAIRE)
for clipper, profile in zip(clippers, profiles):
cri_login = cri_ldap.try_match(clipper)
if cri_login is not None:
img_url = "{}/photos/{}.jpg".format(base_annuaire, cri_login)
try:
istream = urlopen(img_url)
profile.picture.save(
name="{}.jpg".format(profile.user.username),
content=File(BytesIO(istream.read())),
)
except HTTPError:
# Parfois il y a une erreur 404 : dans ce cas, pas de photo
pass