import json import os import sys from datetime import datetime import django from django.contrib.auth import get_user_model print("\nTransfert des fiches annuaires :") # Configuration print("Paramétrage de Django...", end=" ") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "annuaire.settings.prod") django.setup() User = get_user_model() from authens.models import CASAccount # noqa from fiches.management.commands._ldap import ClipperLDAP # noqa from fiches.models import Address, Department, Mail, Phone, Profile # noqa print("[ok]") # Utilitaires def parse_date(s): try: return datetime.strptime(s, "%Y-%m-%d").date() except (ValueError, TypeError): return None def get_text_field(data): text = "" if data["interests"]: text += "Mes intérêts :\n" + data["interests"] + "\n\n" if data["websites"]: text += "Mes pages web :\n\n" + data["websites"] + "\n\n" if data["comments"]: text += data["comments"] + "\n\n" return text def get_address(data): content = data["lines"] + "\n" if data["zip"] and data["city"]: content += data["zip"] + ", " + data["city"] elif data["zip"] or data["city"]: content += data["zip"] + data["city"] if data["country"]: content += ", " + data["country"] return content def parse_list(s): s = s or "" return [p for p in s.replace(",", ";").replace("\n", ";").split(";") if p] def create_phones(s, profile, numeros, name="Téléphone"): for p in parse_list(s): numeros.append(Phone(profile=profile, name=name, number=p)) def create_mails(s, profile, mails, name="E-mail"): for m in parse_list(s): mails.append(Mail(profile=profile, name=name, mail=m)) print("Récupération des comptes clipper...", end=" ") ldap = ClipperLDAP() clippers = {c.uid: c for c in ldap.get_clipper_list(stdout=sys.stdout)} print("[ok]") depts = { dept: Department.objects.get_or_create(name=dept)[0].id for dept in ldap.verbose_depts.values() } users_to_create = [] users = {} profiles_to_create = [] dept_m2m_to_create = [] print("Création des comptes...", end=" ") for clipper in clippers.values(): user = User(username=clipper.uid, email=clipper.email) users_to_create.append(user) User.objects.bulk_create(users_to_create) users = {u.username: (u, clippers[u.username]) for u in User.objects.all()} print("[ok]") print("Création des comptes Authens...", end=" ") cas_accounts = [] for (u, c) in users.values(): cas_accounts.append(CASAccount(user=u, cas_login=c.uid, entrance_year=c.year)) CASAccount.objects.bulk_create(cas_accounts) print("[ok]") fiches = {} references = {} adresses = [] devises = {} numeros = [] mails = [] fiches_pk = {} print("Récupération des anciennes fiches :") # On recrée les profils with open("old_fiches.json") as json_file: data = json.load(json_file) for obj in data: if obj["model"] == "annuaire.fiches2021": obj_data = obj["fields"] try: user, clipper = users[obj_data["login"]] fiches_pk[obj["pk"]] = user.username fiches[obj_data["login"]] = Profile( user=user, full_name=" ".join((obj_data["firstname"], obj_data["lastname"])), promotion=clipper.year, birth_date=parse_date((obj_data["birthdate"] or "")), past_studies=( (obj_data["past_studies"] or "") + "\n" + (obj_data["studies"] or "") ).strip(), experiences=(obj_data["experiences"] or ""), thurne=(obj_data["room"] or ""), text_field=get_text_field(obj_data), ) create_phones(obj_data["phones"], fiches[obj_data["login"]], numeros) create_phones( obj_data["mobiles"], fiches[obj_data["login"]], numeros, name="Portable", ) create_mails(obj_data["emails"], fiches[obj_data["login"]], mails) dept_m2m_to_create.append( Profile.department.through( profile=fiches[obj_data["login"]], department_id=depts[clipper.dept], ) ) except KeyError: print(f"\tLogin inconnu : {obj_data['login']}") elif obj["model"] == "annuaire.lesreferences2021": try: username = fiches_pk[obj["pk"]] surnom = obj["fields"]["nickname"] if surnom: fiche = fiches[username] if fiche.nickname: fiche.nickname += ", " fiche.nickname += surnom except KeyError: pass elif obj["model"] == "annuaire.adresses2021": try: username = fiches_pk[obj["pk"]] obj_data = obj["fields"] fiche = fiches[username] a = get_address(obj_data).strip() if a: adresses.append(Address(profile=fiche, name="Adresse", content=a)) except KeyError: pass elif obj["model"] == "annuaire.devises2021": try: username = fiches_pk[obj["pk"]] obj_data = obj["fields"] fiche = fiches[username] if obj_data["quote"]: fiche.text_field += f"\n{obj_data['quote']}" if obj_data["source"]: fiche.text_field += f" ({obj_data['source']})" if obj_data["author"]: fiche.text_field += f", ({obj_data['author']})" except KeyError: pass print("Création des nouvelles fiches fiches...", end=" ") Profile.objects.bulk_create(fiches.values()) profils = {p.user.username: p for p in Profile.objects.select_related("user")} print("[ok]") print("Rattachement des départements...", end=" ") for dept_m2m in dept_m2m_to_create: dept_m2m.profile = profils[dept_m2m.profile.user.username] Profile.department.through.objects.bulk_create(dept_m2m_to_create) print("[ok]") print("Création des numéros de téléphone...", end=" ") for p in numeros: p.profile = profils[p.profile.user.username] Phone.objects.bulk_create(numeros) print("[ok]") print("Création des adresses mail...", end=" ") for m in mails: m.profile = profils[m.profile.user.username] Mail.objects.bulk_create(mails) print("[ok]") print("Création des adresses...", end=" ") for a in adresses: a.profile = profils[a.profile.user.username] Address.objects.bulk_create(adresses) print("[ok]")