import csv import io import smtplib import networkx as nx import numpy as np from networkx.algorithms.dag import ancestors, descendants from django.contrib.auth import get_user_model from django.contrib.auth.hashers import make_password from django.core.exceptions import ValidationError from django.core.mail import EmailMessage from django.core.validators import validate_email from django.template.loader import render_to_string from django.urls import reverse from django.utils.translation import gettext_lazy as _ from shared.auth.utils import generate_password # ############################################################################# # Classes pour différencier les différents types de questions # ############################################################################# class CastFunctions: """Classe pour enregistrer les votes""" def cast_select(user, vote_form): """On enregistre un vote classique""" selected, n_selected = [], [] for v in vote_form: if v.cleaned_data["selected"]: selected.append(v.instance) else: n_selected.append(v.instance) user.votes.add(*selected) user.votes.remove(*n_selected) def cast_rank(user, vote_form): """On enregistre un vote par classement""" from .models import Rank, Vote # On enregistre les votes pour pouvoir créér les classements options = [v.instance for v in vote_form] user.votes.add(*options) votes = { v.option: v for v in Vote.objects.filter(user=user, option__in=options) } ranks_create = [] ranks_update = [] for v in vote_form: vote = votes[v.instance] if hasattr(vote, "rank"): vote.rank.rank = v.cleaned_data["rank"] ranks_update.append(vote.rank) else: ranks_create.append(Rank(vote=vote, rank=v.cleaned_data["rank"])) Rank.objects.bulk_update(ranks_update, ["rank"]) Rank.objects.bulk_create(ranks_create) class TallyFunctions: """Classe pour gérer les dépouillements""" def tally_select(question): """On dépouille un vote classique""" from .models import Option max_votes = 0 options = [] for o in question.options.prefetch_related("voters").all(): o.nb_votes = o.voters.count() max_votes = max(max_votes, o.nb_votes) options.append(o) for o in options: o.winner = o.nb_votes == max_votes question.max_votes = max_votes question.save() Option.objects.bulk_update(options, ["nb_votes", "winner"]) def tally_schultze(question): """On dépouille un vote par classement et on crée la matrice des duels""" from .models import Duel, Option, Rank options = list(question.options.all()) nb_options = len(options) ranks = Rank.objects.select_related("vote").filter(vote__option__in=options) ranks_by_user = {} for r in ranks: user = r.vote.user if user in ranks_by_user: ranks_by_user[user].append(r) else: ranks_by_user[user] = [r] ballots = [] # Pour chaque votant·e, on regarde son classement for user in ranks_by_user: votes = ranks_by_user[user] ballot = np.zeros((nb_options, nb_options)) for i in range(nb_options): for j in range(i): ballot[i, j] = int(votes[i].rank < votes[j].rank) ballot[j, i] = int(votes[j].rank < votes[i].rank) ballots.append(ballot) if ballots != []: # On additionne les classements de tout le monde pour avoir la matrice # des duels duels = sum(ballots) # Configuration du graphe graph = nx.DiGraph() graph.add_nodes_from(options) cells = [] # On enregistre la matrice for i in range(nb_options): for j in range(nb_options): if duels[i, j] > duels[j, i]: graph.add_edge( options[i], options[j], weight=(duels[i, j] - duels[j, i]) ) if duels[i, j] > 0: cells.append( Duel( question=question, winner=options[i], loser=options[j], amount=duels[i, j], ) ) Duel.objects.bulk_create(cells) # On utilise la méthode de Schwartz pour trouver les options gagnantes while graph.edges(): losers = set() # Les options qui se font battre strictement for n in graph.nodes(): losers |= set(descendants(graph, n)) - set(ancestors(graph, n)) if losers: # On supprime les options perdantes for n in losers: graph.remove_node(n) else: # On n'a pas d'options perdantes, on supprime les arêtes de poids # le plus faible min_weight = min(nx.get_edge_attributes(graph, "weight").values()) min_edges = [] for (u, v) in graph.edges(): if graph[u][v]["weight"] == min_weight: min_edges.append((u, v)) for (u, v) in min_edges: graph.remove_edge(u, v) # Les options gagnantes sont celles encore présentes dans le graphe winners = graph.nodes() for o in options: o.winner = o in winners Option.objects.bulk_update(options, ["winner"]) class ValidateFunctions: """Classe pour valider les formsets selon le type de question""" def always_true(vote_form): """Retourne True pour les votes sans validation particulière""" return True def unique_selected(vote_form): """Vérifie qu'une seule option est choisie""" nb_selected = 0 for v in vote_form: nb_selected += v.cleaned_data["selected"] if nb_selected == 0: vote_form._non_form_errors.append( ValidationError(_("Vous devez sélectionnner une option.")) ) return False elif nb_selected > 1: vote_form._non_form_errors.append( ValidationError(_("Vous ne pouvez pas sélectionner plus d'une option.")) ) return False return True def limit_ranks(vote_form): """Limite le classement au nombre d'options""" nb_options = len(vote_form) valid = True for v in vote_form: rank = v.cleaned_data["rank"] if rank is None: rank = nb_options if rank > nb_options: v.add_error( "rank", _("Le classement maximal est {}.").format(nb_options) ) valid = False elif rank < 1: v.add_error("rank", _("Le classement minimal est 1.")) valid = False v.cleaned_data["rank"] = rank # On ajoute le défaut return valid class ResultsData: """Classe pour afficher des informations supplémentaires après la fin d'une élection""" def select(question): """On renvoie l'explication des couleurs""" return render_to_string("elections/results/select.html") def rank(question): """On récupère la matrice des résultats et on l'affiche""" duels = question.duels.all() options = list(question.options.all()) n = len(options) _matrix = np.full((n, n), {"value": 0}, dtype=dict) matrix = np.empty((n, n), dtype=tuple) for d in duels: i, j = options.index(d.loser), options.index(d.winner) _matrix[i, j] = { "value": d.amount, "winner": d.winner.get_abbr(j + 1), "loser": d.loser.get_abbr(i + 1), } for i in range(n): for j in range(n): if _matrix[i, j]["value"] > _matrix[j, i]["value"]: matrix[i, j] = (_matrix[i, j], "is-success") elif _matrix[i, j]["value"] < _matrix[j, i]["value"]: matrix[i, j] = (_matrix[i, j], "is-danger") else: matrix[i, j] = (_matrix[i, j], "") matrix = zip(matrix.tolist(), options) return render_to_string( "elections/results/rank.html", {"q": question, "matrix": matrix, "options": options}, ) class BallotsData: """Classe pour afficher les bulletins d'une question""" def select(question): """Renvoie un tableau affichant les options sélectionnées pour chaque bulletin""" from .models import Vote votes = Vote.objects.filter(option__question=question).select_related("user") options = list(question.options.all()) ballots = {} for v in votes: ballot = ballots.get(v.user, [False] * len(options)) ballot[options.index(v.option)] = True ballots[v.user] = ballot return render_to_string( "elections/ballots/select.html", {"options": options, "ballots": sorted(ballots.values(), reverse=True)}, ) def rank(question): """Renvoie un tableau contenant les classements des options par bulletin""" from .models import Rank options = list(question.options.all()) ranks = Rank.objects.select_related("vote__user").filter( vote__option__in=options ) ranks_by_user = {} for r in ranks: user = r.vote.user if user in ranks_by_user: ranks_by_user[user].append(r.rank) else: ranks_by_user[user] = [r.rank] return render_to_string( "elections/ballots/rank.html", {"options": options, "ballots": sorted(ranks_by_user.values())}, ) # ############################################################################# # Fonctions pour importer une liste de votant·e·s # ############################################################################# def create_users(election, csv_file): """Crée les votant·e·s pour l'élection donnée, en remplissant les champs `username`, `election` et `full_name`. """ User = get_user_model() dialect = csv.Sniffer().sniff(csv_file.readline().decode("utf-8")) csv_file.seek(0) reader = csv.reader(io.StringIO(csv_file.read().decode("utf-8")), dialect) users = [ User( election=election, username=f"{election.id}__{username}", email=email, full_name=full_name, is_active=False, ) for (username, full_name, email) in reader ] User.objects.bulk_create(users) def check_csv(csv_file): """Vérifie que le fichier donnant la liste de votant·e·s est bien formé""" try: dialect = csv.Sniffer().sniff(csv_file.readline().decode("utf-8")) except csv.Error: return [ _( "Format invalide. Vérifiez que le fichier est bien formé (i.e. " "chaque ligne de la forme 'login,nom,email')." ) ] csv_file.seek(0) reader = csv.reader(io.StringIO(csv_file.read().decode("utf-8")), dialect) errors = [] users = {} line_nb = 0 for line in reader: line_nb += 1 if len(line) != 3: errors.append( _("La ligne {} n'a pas le bon nombre d'éléments.").format(line_nb) ) else: if line[0] == "": errors.append( _("Valeur manquante dans la ligne {} : 'login'.").format(line_nb) ) else: if line[0] in users: errors.append( _("Doublon dans les logins : lignes {} et {}.").format( line_nb, users[line[0]] ) ) else: users[line[0]] = line_nb if line[1] == "": errors.append( _("Valeur manquante dans la ligne {} : 'nom'.").format(line_nb) ) try: validate_email(line[2]) except ValidationError: errors.append( _("Adresse mail invalide à la ligne {} : '{}'.").format( line_nb, line[2] ) ) return errors def send_mail(election, subject, body, reply_to): """Envoie le mail d'annonce de l'élection avec identifiants et mot de passe aux votant·e·s, le mdp est généré en même temps que le mail est envoyé. """ User = get_user_model() # On n'envoie le mail qu'aux personnes qui n'en n'ont pas déjà reçu un voters = list(election.registered_voters.exclude(has_valid_email=True)) e_url = reverse("election.view", args=[election.id]) url = f"https://vote.eleves.ens.fr{e_url}" start = election.start_date.strftime("%d/%m/%Y %H:%M %Z") end = election.end_date.strftime("%d/%m/%Y %H:%M %Z") messages = [] for v in voters: password = generate_password() v.password = make_password(password) messages.append( ( EmailMessage( subject=subject, body=body.format( full_name=v.full_name, election_url=url, start=start, end=end, username=v.base_username, password=password, ), to=[v.email], reply_to=[reply_to], # On modifie l'adresse de retour d'erreur headers={"Return-Path": "kadenios@www.eleves.ens.fr"}, ), v, ) ) for (m, v) in messages: try: m.send() except smtplib.SMTPException: v.has_valid_email = False else: v.has_valid_email = True User.objects.bulk_update(voters, ["password", "has_valid_email"])