Merge branch 'Aufinal/ldap' into 'master'

Import des conscrit·e·s depuis le LDAP

Closes #7

See merge request klub-dev-ens/annuaire!11
This commit is contained in:
Martin Pepin 2021-01-08 12:50:36 +01:00
commit e16b04f127
8 changed files with 443 additions and 46 deletions

5
.gitignore vendored
View file

@ -3,11 +3,16 @@
*.swo
*~
*#
*.log
/media/picture
/venv/
.vagrant/
static/
*.sqlite3
fiches/templates/fiches/base_old.html
fiches/static/fiches/css_old/
.vscode

View file

@ -20,7 +20,27 @@ BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# See https://docs.djangoproject.com/en/dev/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = '84=n(@@wl(04oc$(-+3surgrlf&uq3=m)=(hpg$immi1h69s)p'
SECRET_KEY = "84=n(@@wl(04oc$(-+3surgrlf&uq3=m)=(hpg$immi1h69s)p"
# À bouger dans un ficher secret quand il sera créé ?
LDAP = {
"SPI": {
"PROTOCOL": "ldaps",
"URL": "ldap.spi.ens.fr",
"PORT": 636,
},
"CRI": {
"PROTOCOL": "ldap",
"URL": "annuaire.ens.fr",
"PORT": 389,
},
}
ANNUAIRE = {
"PROTOCOL": "http",
"URL": "annuaireweb.ens.fr",
"PORT": 80,
}
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
@ -30,59 +50,59 @@ ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_cas_ng',
'fiches'
"django.contrib.admin",
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
"django_cas_ng",
"fiches",
]
MIDDLEWARE = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
"django.middleware.security.SecurityMiddleware",
"django.contrib.sessions.middleware.SessionMiddleware",
"django.middleware.common.CommonMiddleware",
"django.middleware.csrf.CsrfViewMiddleware",
"django.contrib.auth.middleware.AuthenticationMiddleware",
"django.contrib.messages.middleware.MessageMiddleware",
"django.middleware.clickjacking.XFrameOptionsMiddleware",
]
ROOT_URLCONF = 'annuaire.urls'
ROOT_URLCONF = "annuaire.urls"
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
"BACKEND": "django.template.backends.django.DjangoTemplates",
"DIRS": [],
"APP_DIRS": True,
"OPTIONS": {
"context_processors": [
"django.template.context_processors.debug",
"django.template.context_processors.request",
"django.contrib.auth.context_processors.auth",
"django.contrib.messages.context_processors.messages",
],
},
},
]
AUTHENTICATION_BACKENDS = (
'django.contrib.auth.backends.ModelBackend',
'django_cas_ng.backends.CASBackend',
"django.contrib.auth.backends.ModelBackend",
"django_cas_ng.backends.CASBackend",
)
WSGI_APPLICATION = 'annuaire.wsgi.application'
WSGI_APPLICATION = "annuaire.wsgi.application"
# Database
# https://docs.djangoproject.com/en/dev/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": os.path.join(BASE_DIR, "db.sqlite3"),
}
}
@ -92,17 +112,16 @@ DATABASES = {
AUTH_PASSWORD_VALIDATORS = [
{
'NAME':
'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
"NAME": "django.contrib.auth.password_validation.UserAttributeSimilarityValidator",
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
"NAME": "django.contrib.auth.password_validation.MinimumLengthValidator",
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
"NAME": "django.contrib.auth.password_validation.CommonPasswordValidator",
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
"NAME": "django.contrib.auth.password_validation.NumericPasswordValidator",
},
]
@ -110,9 +129,9 @@ AUTH_PASSWORD_VALIDATORS = [
# Internationalization
# https://docs.djangoproject.com/en/dev/topics/i18n/
LANGUAGE_CODE = 'fr-fr'
LANGUAGE_CODE = "fr-fr"
TIME_ZONE = 'UTC'
TIME_ZONE = "UTC"
USE_I18N = True
@ -124,14 +143,14 @@ USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/dev/howto/static-files/
STATIC_URL = '/static/'
STATIC_URL = "/static/"
MEDIA_ROOT = os.path.join(BASE_DIR, 'media')
MEDIA_ROOT = os.path.join(BASE_DIR, "media")
MEDIA_URL = '/media/'
MEDIA_URL = "/media/"
CAS_SERVER_URL = 'https://cas.eleves.ens.fr/'
CAS_SERVER_URL = "https://cas.eleves.ens.fr/"
CAS_VERSION = "2"
EMAIL_BACKEND = 'django.core.mail.backends.console.EmailBackend'
EMAIL_BACKEND = "django.core.mail.backends.console.EmailBackend"

View file

@ -0,0 +1,146 @@
from collections import namedtuple
import ldap
import ldap.filter
from django.conf import settings
ClipperAccount = namedtuple("ClipperAccount", ["uid", "name", "email", "year", "dept"])
class LDAP:
"""Classe pour se connecter à un LDAP ENS (CRI ou SPI).
Inspirée de `merle_scripts`.
"""
# À renseigner
LDAP_SERVER = {
"PROTOCOL": "ldaps",
"URL": "ldap.example.com",
"PORT": 636,
}
search_base = ""
attr_list = []
def __init__(self):
""" Initialize the LDAP object """
self.ldap_obj = ldap.initialize(
"{PROTOCOL}://{URL}:{PORT}".format(**self.LDAP_SERVER)
)
def search(self, filters="(objectClass=*)"):
"""Do a ldap.search_s with the given filters, as specified for
ldap.search_s"""
return self.ldap_obj.search_s(
self.search_base, ldap.SCOPE_SUBTREE, filters, self.attr_list
)
@staticmethod
def extract_ldap_info(entry, field):
""" Extract the given field from an LDAP entry as an UTF-8 string """
return entry[1].get(field, [b""])[0].decode("utf-8")
class ClipperLDAP(LDAP):
LDAP_SERVER = settings.LDAP["SPI"]
search_base = "dc=spi,dc=ens,dc=fr"
attr_list = ["cn", "uid", "mail", "homeDirectory"]
verbose_depts = {
"bio": "Biologie",
"chimie": "Chimie",
"dec": "Études Cognitives",
"geol": "Géosciences",
"guests": "Invité·e",
"info": "Informatique",
"litt": "Lettres",
"maths": "Mathématiques",
"pei": "PEI",
"phy": "Physique",
}
def parse_dept(self, home_dir):
"""Extrait le département d'entrée d'un·e élève à partir de son dossier.
Le dossier a le format `/users/<promo>/<dept>/<login>`.
"""
users, promo, dept, login = home_dir.split("/")[1:]
if users != "users":
raise ValueError("Invalid home directory")
# Ça casse en 2100, mais le système de naming de sas aussi...
promo = 2000 + int(promo)
return promo, self.verbose_depts.get(dept, None)
def get_clipper_list(self, promo_filter=None, verbosity=1, stdout=None):
"""Extrait la liste des comptes clipper présents dans le LDAP.
Renvoie une liste de `namedTuple` contenant leur nom, prénom, adresse mail et
département/année d'entrée.
Si `promo_filter != None`, ne renvoie que les clippers d'une promotion donnée.
"""
search_res = self.search()
clipper_list = []
for entry in search_res:
uid = self.extract_ldap_info(entry, "uid")
# Il y a des comptes "bizarre" (e.g. `root`) avec uid vide
if len(uid) > 0:
try:
name = self.extract_ldap_info(entry, "cn")
email = self.extract_ldap_info(entry, "mail")
promo, dept = self.parse_dept(
self.extract_ldap_info(entry, "homeDirectory")
)
if promo_filter is None or promo == promo_filter:
clipper_list.append(
ClipperAccount(uid, name, email, promo, dept)
)
if verbosity == 3:
stdout.write("Compte clipper trouvé : {}".format(uid))
except ValueError:
if verbosity >= 2:
stdout.write("Entrée malformée trouvée : {}".format(entry))
pass
return clipper_list
class AnnuaireLDAP(LDAP):
LDAP_SERVER = settings.LDAP["CRI"]
search_base = "ou=people,dc=ens,dc=fr"
attr_list = ["uid", "sn", "givenName", "ou"]
def try_match(self, profile, verbosity=1, stderr=None):
"""Essaie de trouver une entrée correspondant au profile donné dans
l'annuaire de l'ENS. L'heuristique est la suivante : il est très probable
que le prénom de la personne commence par le premier mot de `profile.full_name`,
et que son nom de famille finisse par le dernier mot.
"""
given_name = profile.full_name.split(" ")[0]
last_name = profile.full_name.split(" ")[-1]
search_name = self.search(
"(&(givenName={}*)(sn=*{}))".format(given_name, last_name)
)
if len(search_name) > 0:
if len(search_name) > 2:
if verbosity >= 2:
stderr.write(
"Erreur : deux logins CRI trouvés pour {} ({})".format(
profile.user.username, profile.promotion
)
)
return None
return self.extract_ldap_info(search_name[0], "uid")
return None

View file

@ -0,0 +1,103 @@
from datetime import date
from django.contrib.auth.models import User
from django.core.management.base import BaseCommand, CommandError
from fiches.models import Department, Profile
from ._ldap import ClipperLDAP
class Command(BaseCommand):
help = "Crée les fiches annuaire des conscrit·e·s automatiquement"
def add_arguments(self, parser):
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--all", action="store_true", help="Importe l'intégralité des promotions"
)
group.add_argument("--promo", type=int, help="Spécifie la promotion à importer")
def get_current_promo(self):
today = date.today()
year = today.year
if today.month < 9:
year -= 1
return year
def handle(self, *args, **options):
if options["all"]:
promo = None
elif options["promo"] is not None:
promo = options["promo"]
if promo < 100:
promo = 2000 + promo
else:
promo = self.get_current_promo()
if promo < 2000 or promo > 2100:
raise CommandError("Promotion invalide : {}".format(promo))
verbosity = options["verbosity"]
# On récupère la liste des élèves à créer
ldap = ClipperLDAP()
clipper_list = ldap.get_clipper_list(
promo_filter=promo, verbosity=verbosity, stdout=self.stdout
)
# On vire les élèves déjà existants
existing_users = set(User.objects.values_list("username", flat=True))
clippers = [
clipper for clipper in clipper_list if clipper.uid not in existing_users
]
# Les départements sont créés à la main ; il y en a peu.
depts = {
dept: Department.objects.get_or_create(name=dept)[0].id
for dept in ldap.verbose_depts.values()
}
users_to_create = []
profiles_to_create = []
dept_m2m_to_create = []
for clipper in clippers:
user = User(username=clipper.uid, email=clipper.email)
profile = Profile(user=user, full_name=clipper.name, promotion=clipper.year)
users_to_create.append(user)
profiles_to_create.append(profile)
dept_m2m_to_create.append(
Profile.department.through(
profile=profile, department_id=depts[clipper.dept]
)
)
# À décommenter pour utilisation locale (avec SQLite)
# def _manual_ids(cls, to_create):
# pid = getattr(cls.objects.order_by("-id").first(), "id", 1)
# for p in to_create:
# pid += 1
# p.id = pid
# _manual_ids(User, users_to_create)
# _manual_ids(Profile, profiles_to_create)
# _manual_ids(Profile.department.through, dept_m2m_to_create)
User.objects.bulk_create(users_to_create)
for profile in profiles_to_create:
profile.user_id = profile.user.id
Profile.objects.bulk_create(profiles_to_create)
for dept_m2m in dept_m2m_to_create:
dept_m2m.profile_id = dept_m2m.profile.id
Profile.department.through.objects.bulk_create(dept_m2m_to_create)
if verbosity >= 1:
self.stdout.write(
(
"Création de {} utilisateur·ices et de {}"
" profils effectuée avec succès"
).format(len(users_to_create), len(profiles_to_create))
)

View file

@ -0,0 +1,96 @@
from datetime import date
from io import BytesIO
from urllib.error import HTTPError
from urllib.request import urlopen
from django.conf import settings
from django.core.files import File
from django.core.management.base import BaseCommand
from fiches.models import Profile
from ._ldap import AnnuaireLDAP
class Command(BaseCommand):
help = "Si possible, import les photos des conscrit·e·s dans l'annuaire."
def add_arguments(self, parser):
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--all", action="store_true", help="Importe l'intégralité des promotions"
)
group.add_argument("--promo", type=int, help="Spécifie la promotion à importer")
def get_current_promo(self):
today = date.today()
year = today.year
if today.month < 9:
year -= 1
return year
def handle(self, *args, **options):
if options["all"]:
promo = None
elif options["promo"] is not None:
promo = options["promo"]
if promo < 100:
promo = 2000 + promo
else:
promo = self.get_current_promo()
verbosity = options["verbosity"]
no_images = Profile.objects.select_related("user").filter(picture="")
if promo is not None:
no_images = no_images.filter(promotion=promo)
cri_ldap = AnnuaireLDAP()
base_annuaire = "{PROTOCOL}://{URL}:{PORT}".format(**settings.ANNUAIRE)
success = 0
for profile in no_images:
cri_login = cri_ldap.try_match(
profile, verbosity=verbosity, stderr=self.stderr
)
if cri_login is not None:
img_url = "{}/photos/{}.jpg".format(base_annuaire, cri_login)
try:
istream = urlopen(img_url)
profile.picture.save(
name="{}.jpg".format(profile.user.username),
content=File(BytesIO(istream.read())),
)
success += 1
if verbosity == 3:
self.stdout.write(
"Photo trouvée pour {} ({})".format(
profile.user.username, profile.promotion
)
)
except HTTPError:
# Parfois, même si on trouve un login CRI, il y a une erreur 404.
# Dans ce cas, pas de photo : on échoue gracieusement.
if verbosity >= 2:
self.stdout.write(
"Login CRI trouvé mais pas de photo pour {} ({})".format(
profile.user.username, profile.promotion
)
)
pass
elif verbosity >= 2:
self.stdout.write(
"Pas de login CRI trouvé pour {} ({})".format(
profile.user.username, profile.promotion
)
)
if verbosity >= 1:
self.stdout.write(
"{} profils traités ; {} images importées.".format(
no_images.count(), success
)
)

View file

@ -0,0 +1,23 @@
# Generated by Django 2.2.17 on 2020-11-13 10:38
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('fiches', '0007_auto_20200917_1421'),
]
operations = [
migrations.AlterField(
model_name='department',
name='name',
field=models.CharField(max_length=255, unique=True, verbose_name='nom du département'),
),
migrations.AlterField(
model_name='profile',
name='birth_date',
field=models.DateField(blank=True, null=True, verbose_name='date de naissance'),
),
]

View file

@ -23,7 +23,9 @@ class Profile(models.Model):
promotion = models.IntegerField(
validators=[MinValueValidator(1980)], verbose_name=_("promotion")
)
birth_date = models.DateField(blank=True, verbose_name=_("date de naissance"))
birth_date = models.DateField(
blank=True, null=True, verbose_name=_("date de naissance")
)
thurne = models.CharField(blank=True, max_length=100, verbose_name=_("thurne"))
text_field = models.TextField(blank=True, verbose_name=_("champ libre"))
printing = models.BooleanField(
@ -41,7 +43,9 @@ class Profile(models.Model):
class Department(models.Model):
name = models.CharField(max_length=255, verbose_name=_("nom du département"))
name = models.CharField(
max_length=255, verbose_name=_("nom du département"), unique=True
)
def __str__(self):
return self.name

View file

@ -1,3 +1,4 @@
django==2.2.*
Pillow
django_cas_ng
python-ldap