kadenios/elections/utils.py

421 lines
14 KiB
Python

import csv
import io
import smtplib
import networkx as nx
import numpy as np
from networkx.algorithms.dag import ancestors, descendants
from django.contrib.auth import get_user_model
from django.contrib.auth.hashers import make_password
from django.core.exceptions import ValidationError
from django.core.mail import EmailMessage
from django.core.validators import validate_email
from django.template.loader import render_to_string
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
from shared.auth.utils import generate_password
# #############################################################################
# Classes pour différencier les différents types de questions
# #############################################################################
class CastFunctions:
"""Classe pour enregistrer les votes"""
def cast_select(user, vote_form):
"""On enregistre un vote classique"""
selected, n_selected = [], []
for v in vote_form:
if v.cleaned_data["selected"]:
selected.append(v.instance)
else:
n_selected.append(v.instance)
user.votes.add(*selected)
user.votes.remove(*n_selected)
def cast_rank(user, vote_form):
"""On enregistre un vote par classement"""
from .models import Rank, Vote
# On enregistre les votes pour pouvoir créér les classements
options = [v.instance for v in vote_form]
user.votes.add(*options)
votes = {
v.option: v for v in Vote.objects.filter(user=user, option__in=options)
}
ranks_create = []
ranks_update = []
for v in vote_form:
vote = votes[v.instance]
if hasattr(vote, "rank"):
vote.rank.rank = v.cleaned_data["rank"]
ranks_update.append(vote.rank)
else:
ranks_create.append(Rank(vote=vote, rank=v.cleaned_data["rank"]))
Rank.objects.bulk_update(ranks_update, ["rank"])
Rank.objects.bulk_create(ranks_create)
class TallyFunctions:
"""Classe pour gérer les dépouillements"""
def tally_select(question):
"""On dépouille un vote classique"""
from .models import Option
max_votes = 0
options = []
for o in question.options.prefetch_related("voters").all():
o.nb_votes = o.voters.count()
max_votes = max(max_votes, o.nb_votes)
options.append(o)
for o in options:
o.winner = o.nb_votes == max_votes
question.max_votes = max_votes
question.save()
Option.objects.bulk_update(options, ["nb_votes", "winner"])
def tally_schultze(question):
"""On dépouille un vote par classement et on crée la matrice des duels"""
from .models import Duel, Option, Rank
options = list(question.options.all())
nb_options = len(options)
ranks = Rank.objects.select_related("vote").filter(vote__option__in=options)
ranks_by_user = {}
for r in ranks:
user = r.vote.user
if user in ranks_by_user:
ranks_by_user[user].append(r)
else:
ranks_by_user[user] = [r]
ballots = []
# Pour chaque votant·e, on regarde son classement
for user in ranks_by_user:
votes = ranks_by_user[user]
ballot = np.zeros((nb_options, nb_options))
for i in range(nb_options):
for j in range(i):
ballot[i, j] = int(votes[i].rank < votes[j].rank)
ballot[j, i] = int(votes[j].rank < votes[i].rank)
ballots.append(ballot)
if ballots != []:
# On additionne les classements de tout le monde pour avoir la matrice
# des duels
duels = sum(ballots)
# Configuration du graphe
graph = nx.DiGraph()
graph.add_nodes_from(options)
cells = []
# On enregistre la matrice
for i in range(nb_options):
for j in range(nb_options):
if duels[i, j] > duels[j, i]:
graph.add_edge(
options[i], options[j], weight=(duels[i, j] - duels[j, i])
)
if duels[i, j] > 0:
cells.append(
Duel(
question=question,
winner=options[i],
loser=options[j],
amount=duels[i, j],
)
)
Duel.objects.bulk_create(cells)
# On utilise la méthode de Schwartz pour trouver les options gagnantes
while graph.edges():
losers = set() # Les options qui se font battre strictement
for n in graph.nodes():
losers |= set(descendants(graph, n)) - set(ancestors(graph, n))
if losers:
# On supprime les options perdantes
for n in losers:
graph.remove_node(n)
else:
# On n'a pas d'options perdantes, on supprime les arêtes de poids
# le plus faible
min_weight = min(nx.get_edge_attributes(graph, "weight").values())
min_edges = []
for (u, v) in graph.edges():
if graph[u][v]["weight"] == min_weight:
min_edges.append((u, v))
for (u, v) in min_edges:
graph.remove_edge(u, v)
# Les options gagnantes sont celles encore présentes dans le graphe
winners = graph.nodes()
for o in options:
o.winner = o in winners
Option.objects.bulk_update(options, ["winner"])
class ValidateFunctions:
"""Classe pour valider les formsets selon le type de question"""
def always_true(vote_form):
"""Retourne True pour les votes sans validation particulière"""
return True
def unique_selected(vote_form):
"""Vérifie qu'une seule option est choisie"""
nb_selected = 0
for v in vote_form:
nb_selected += v.cleaned_data["selected"]
if nb_selected == 0:
vote_form._non_form_errors.append(
ValidationError(_("Vous devez sélectionnner une option."))
)
return False
elif nb_selected > 1:
vote_form._non_form_errors.append(
ValidationError(_("Vous ne pouvez pas sélectionner plus d'une option."))
)
return False
return True
def limit_ranks(vote_form):
"""Limite le classement au nombre d'options"""
nb_options = len(vote_form)
valid = True
for v in vote_form:
rank = v.cleaned_data["rank"]
if rank is None:
rank = nb_options
if rank > nb_options:
v.add_error(
"rank", _("Le classement maximal est {}.").format(nb_options)
)
valid = False
elif rank < 1:
v.add_error("rank", _("Le classement minimal est 1."))
valid = False
v.cleaned_data["rank"] = rank # On ajoute le défaut
return valid
class ResultsData:
"""Classe pour afficher des informations supplémentaires après la fin d'une élection"""
def select(question):
"""On renvoie l'explication des couleurs"""
return render_to_string("elections/results/select.html")
def rank(question):
"""On récupère la matrice des résultats et on l'affiche"""
duels = question.duels.all()
options = list(question.options.all())
n = len(options)
_matrix = np.zeros((n, n), dtype=int)
matrix = np.zeros((n, n), dtype=tuple)
for d in duels:
i, j = options.index(d.loser), options.index(d.winner)
_matrix[i, j] = d.amount
for i in range(n):
for j in range(n):
if _matrix[i, j] > _matrix[j, i]:
matrix[i, j] = (_matrix[i, j], "is-success")
elif _matrix[i, j] < _matrix[j, i]:
matrix[i, j] = (_matrix[i, j], "is-danger")
else:
matrix[i, j] = (_matrix[i, j], "")
matrix = zip(matrix.tolist(), options)
return render_to_string(
"elections/results/rank.html",
{"q": question, "matrix": matrix, "options": options},
)
class BallotsData:
"""Classe pour afficher les bulletins d'une question"""
def select(question):
"""Renvoie un tableau affichant les options sélectionnées pour chaque bulletin"""
from .models import Vote
votes = Vote.objects.filter(option__question=question).select_related("user")
options = list(question.options.all())
ballots = {}
for v in votes:
ballot = ballots.get(v.user, [False] * len(options))
ballot[options.index(v.option)] = True
ballots[v.user] = ballot
return render_to_string(
"elections/ballots/select.html", {"options": options, "ballots": ballots}
)
def rank(question):
"""Renvoie un tableau contenant les classements des options par bulletin"""
from .models import Rank
options = list(question.options.all())
ranks = Rank.objects.select_related("vote__user").filter(
vote__option__in=options
)
ranks_by_user = {}
for r in ranks:
user = r.vote.user
if user in ranks_by_user:
ranks_by_user[user].append(r.rank)
else:
ranks_by_user[user] = [r.rank]
return render_to_string(
"elections/ballots/rank.html",
{"options": options, "ballots": ranks_by_user},
)
# #############################################################################
# Fonctions pour importer une liste de votant·e·s
# #############################################################################
def create_users(election, csv_file):
"""Crée les votant·e·s pour l'élection donnée, en remplissant les champs
`username`, `election` et `full_name`.
"""
dialect = csv.Sniffer().sniff(csv_file.readline().decode("utf-8"))
csv_file.seek(0)
reader = csv.reader(io.StringIO(csv_file.read().decode("utf-8")), dialect)
for (username, full_name, email) in reader:
election.registered_voters.create(
username=f"{election.id}__{username}", email=email, full_name=full_name
)
def check_csv(csv_file):
"""Vérifie que le fichier donnant la liste de votant·e·s est bien formé"""
try:
dialect = csv.Sniffer().sniff(csv_file.readline().decode("utf-8"))
except csv.Error:
return [
_(
"Format invalide. Vérifiez que le fichier est bien formé (i.e. "
"chaque ligne de la forme 'login,nom,email')."
)
]
csv_file.seek(0)
reader = csv.reader(io.StringIO(csv_file.read().decode("utf-8")), dialect)
errors = []
users = {}
line_nb = 0
for line in reader:
line_nb += 1
if len(line) != 3:
errors.append(
_("La ligne {} n'a pas le bon nombre d'éléments.").format(line_nb)
)
else:
if line[0] == "":
errors.append(
_("Valeur manquante dans la ligne {} : 'login'.").format(line_nb)
)
else:
if line[0] in users:
errors.append(
_("Doublon dans les logins : lignes {} et {}.").format(
line_nb, users[line[0]]
)
)
else:
users[line[0]] = line_nb
if line[1] == "":
errors.append(
_("Valeur manquante dans la ligne {} : 'nom'.").format(line_nb)
)
try:
validate_email(line[2])
except ValidationError:
errors.append(
_("Adresse mail invalide à la ligne {} : '{}'.").format(
line_nb, line[2]
)
)
return errors
def send_mail(election, subject, body, reply_to):
"""Envoie le mail d'annonce de l'élection avec identifiants et mot de passe
aux votant·e·s, le mdp est généré en même temps que le mail est envoyé.
"""
User = get_user_model()
# On n'envoie le mail qu'aux personnes qui n'en n'ont pas déjà reçu un
voters = list(election.registered_voters.exclude(has_valid_email=True))
e_url = reverse("election.view", args=[election.id])
url = f"https://vote.eleves.ens.fr{e_url}"
messages = []
for v in voters:
password = generate_password()
v.password = make_password(password)
messages.append(
(
EmailMessage(
subject=subject,
body=body.format(
full_name=v.full_name,
election_url=url,
username=v.base_username,
password=password,
),
to=[v.email],
reply_to=[reply_to],
),
v,
)
)
# get_connection(fail_silently=False).send_messages(messages)
for (m, v) in messages:
try:
m.send()
except smtplib.SMTPException:
v.has_valid_email = False
else:
v.has_valid_email = True
User.objects.bulk_update(voters, ["password", "has_valid_email"])