468 lines
15 KiB
Python
468 lines
15 KiB
Python
import csv
|
|
import io
|
|
import smtplib
|
|
from typing import TYPE_CHECKING, TypeGuard
|
|
|
|
import networkx as nx
|
|
import numpy as np
|
|
from networkx.algorithms.dag import ancestors, descendants
|
|
from numpy._typing import NDArray
|
|
|
|
from django.contrib.auth import get_user_model
|
|
from django.contrib.auth.hashers import make_password
|
|
from django.core.exceptions import ValidationError
|
|
from django.core.files.base import File
|
|
from django.core.mail import EmailMessage
|
|
from django.core.validators import validate_email
|
|
from django.forms import BaseFormSet
|
|
from django.template.loader import render_to_string
|
|
from django.urls import reverse
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
|
from shared.auth.utils import generate_password
|
|
|
|
if TYPE_CHECKING:
|
|
from elections.forms import RankVoteForm, SelectVoteForm
|
|
from elections.models import Election, Question, RankedVote, Vote
|
|
from elections.typing import User
|
|
|
|
|
|
# #############################################################################
|
|
# Classes pour différencier les différents types de questions
|
|
# #############################################################################
|
|
|
|
|
|
def has_rank(v: "Vote") -> TypeGuard["RankedVote"]:
|
|
return hasattr(v, "rank")
|
|
|
|
|
|
class CastFunctions:
|
|
"""Classe pour enregistrer les votes"""
|
|
|
|
@staticmethod
|
|
def cast_select(user: "User", vote_form: "BaseFormSet[SelectVoteForm]"):
|
|
"""On enregistre un vote classique"""
|
|
selected, n_selected = [], []
|
|
for v in vote_form:
|
|
if v.cleaned_data["selected"]:
|
|
selected.append(v.instance)
|
|
else:
|
|
n_selected.append(v.instance)
|
|
|
|
user.votes.add(*selected)
|
|
user.votes.remove(*n_selected)
|
|
|
|
@staticmethod
|
|
def cast_rank(user: "User", vote_form: "BaseFormSet[RankVoteForm]"):
|
|
"""On enregistre un vote par classement"""
|
|
from .models import Rank, Vote
|
|
|
|
# On enregistre les votes pour pouvoir créér les classements
|
|
options = [v.instance for v in vote_form]
|
|
user.votes.add(*options)
|
|
|
|
votes = {
|
|
v.option: v for v in Vote.objects.filter(user=user, option__in=options)
|
|
}
|
|
ranks_create = []
|
|
ranks_update = []
|
|
|
|
for v in vote_form:
|
|
vote = votes[v.instance]
|
|
|
|
if has_rank(vote):
|
|
vote.rank.rank = v.cleaned_data["rank"]
|
|
ranks_update.append(vote.rank)
|
|
else:
|
|
ranks_create.append(Rank(vote=vote, rank=v.cleaned_data["rank"]))
|
|
|
|
Rank.objects.bulk_update(ranks_update, ["rank"])
|
|
Rank.objects.bulk_create(ranks_create)
|
|
|
|
|
|
class TallyFunctions:
|
|
"""Classe pour gérer les dépouillements"""
|
|
|
|
@staticmethod
|
|
def tally_select(question: "Question") -> None:
|
|
"""On dépouille un vote classique"""
|
|
from .models import Option
|
|
|
|
max_votes = 0
|
|
options = []
|
|
|
|
for o in question.options.prefetch_related("voters").all():
|
|
o.nb_votes = o.voters.count()
|
|
max_votes = max(max_votes, o.nb_votes)
|
|
options.append(o)
|
|
|
|
for o in options:
|
|
o.winner = o.nb_votes == max_votes
|
|
|
|
question.max_votes = max_votes
|
|
question.save()
|
|
|
|
Option.objects.bulk_update(options, ["nb_votes", "winner"])
|
|
|
|
@staticmethod
|
|
def tally_schultze(question: "Question") -> None:
|
|
"""On dépouille un vote par classement et on crée la matrice des duels"""
|
|
from .models import Duel, Option, Rank
|
|
|
|
options = list(question.options.all())
|
|
nb_options = len(options)
|
|
ranks = Rank.objects.select_related("vote").filter(vote__option__in=options)
|
|
ranks_by_user = {}
|
|
|
|
for r in ranks:
|
|
user = r.vote.user
|
|
if user in ranks_by_user:
|
|
ranks_by_user[user].append(r)
|
|
else:
|
|
ranks_by_user[user] = [r]
|
|
|
|
ballots: list[NDArray[np.int_]] = []
|
|
|
|
# Pour chaque votant·e, on regarde son classement
|
|
for user in ranks_by_user:
|
|
votes = ranks_by_user[user]
|
|
ballot = np.zeros((nb_options, nb_options), dtype=int)
|
|
|
|
for i in range(nb_options):
|
|
for j in range(i):
|
|
ballot[i, j] = int(votes[i].rank < votes[j].rank)
|
|
ballot[j, i] = int(votes[j].rank < votes[i].rank)
|
|
|
|
ballots.append(ballot)
|
|
|
|
if ballots != []:
|
|
# On additionne les classements de tout le monde pour avoir la matrice
|
|
# des duels
|
|
duels = sum(ballots)
|
|
|
|
# As ballots is not empty, sum cannot be 0
|
|
assert duels != 0
|
|
|
|
# Configuration du graphe
|
|
graph = nx.DiGraph()
|
|
|
|
graph.add_nodes_from(options)
|
|
|
|
cells = []
|
|
|
|
# On enregistre la matrice
|
|
for i in range(nb_options):
|
|
for j in range(nb_options):
|
|
if duels[i, j] > duels[j, i]:
|
|
graph.add_edge(
|
|
options[i], options[j], weight=(duels[i, j] - duels[j, i])
|
|
)
|
|
|
|
if duels[i, j] > 0:
|
|
cells.append(
|
|
Duel(
|
|
question=question,
|
|
winner=options[i],
|
|
loser=options[j],
|
|
amount=duels[i, j],
|
|
)
|
|
)
|
|
Duel.objects.bulk_create(cells)
|
|
|
|
# On utilise la méthode de Schwartz pour trouver les options gagnantes
|
|
while graph.edges():
|
|
losers = set() # Les options qui se font battre strictement
|
|
for n in graph.nodes():
|
|
losers |= set(descendants(graph, n)) - set(ancestors(graph, n))
|
|
|
|
if losers:
|
|
# On supprime les options perdantes
|
|
for n in losers:
|
|
graph.remove_node(n)
|
|
|
|
else:
|
|
# On n'a pas d'options perdantes, on supprime les arêtes de poids
|
|
# le plus faible
|
|
min_weight = min(nx.get_edge_attributes(graph, "weight").values())
|
|
min_edges = []
|
|
for u, v in graph.edges():
|
|
if graph[u][v]["weight"] == min_weight:
|
|
min_edges.append((u, v))
|
|
|
|
for u, v in min_edges:
|
|
graph.remove_edge(u, v)
|
|
|
|
# Les options gagnantes sont celles encore présentes dans le graphe
|
|
winners = graph.nodes()
|
|
for o in options:
|
|
o.winner = o in winners
|
|
|
|
Option.objects.bulk_update(options, ["winner"])
|
|
|
|
|
|
class ValidateFunctions:
|
|
"""Classe pour valider les formsets selon le type de question"""
|
|
|
|
@staticmethod
|
|
def always_true(_) -> bool:
|
|
"""Renvoie True pour les votes sans validation particulière"""
|
|
return True
|
|
|
|
@staticmethod
|
|
def unique_selected(vote_form: "BaseFormSet[SelectVoteForm]") -> bool:
|
|
"""Vérifie qu'une seule option est choisie"""
|
|
|
|
nb_selected = sum(v.cleaned_data["selected"] for v in vote_form)
|
|
|
|
if nb_selected == 0:
|
|
vote_form._non_form_errors.append( # pyright: ignore
|
|
ValidationError(_("Vous devez sélectionnner une option."))
|
|
)
|
|
return False
|
|
elif nb_selected > 1:
|
|
vote_form._non_form_errors.append( # pyright: ignore
|
|
ValidationError(_("Vous ne pouvez pas sélectionner plus d'une option."))
|
|
)
|
|
return False
|
|
return True
|
|
|
|
@staticmethod
|
|
def limit_ranks(vote_form: "BaseFormSet[RankVoteForm]"):
|
|
"""Limite le classement au nombre d'options"""
|
|
nb_options = len(vote_form)
|
|
valid = True
|
|
|
|
for v in vote_form:
|
|
rank = v.cleaned_data["rank"]
|
|
if rank is None:
|
|
rank = nb_options
|
|
|
|
if rank > nb_options:
|
|
v.add_error(
|
|
"rank", _("Le classement maximal est {}.").format(nb_options)
|
|
)
|
|
valid = False
|
|
elif rank < 1:
|
|
v.add_error("rank", _("Le classement minimal est 1."))
|
|
valid = False
|
|
v.cleaned_data["rank"] = rank # On ajoute le défaut
|
|
|
|
return valid
|
|
|
|
|
|
class ResultsData:
|
|
"""Classe pour afficher des informations supplémentaires après la fin d'une élection"""
|
|
|
|
@staticmethod
|
|
def select(_: "Question") -> str:
|
|
"""On renvoie l'explication des couleurs"""
|
|
return render_to_string("elections/results/select.html")
|
|
|
|
@staticmethod
|
|
def rank(question: "Question") -> str:
|
|
"""On récupère la matrice des résultats et on l'affiche"""
|
|
duels = question.duels.all()
|
|
options = list(question.options.all())
|
|
n = len(options)
|
|
|
|
_matrix = np.full((n, n), {"value": 0}, dtype=dict)
|
|
matrix = np.empty((n, n), dtype=tuple)
|
|
|
|
for d in duels:
|
|
i, j = options.index(d.loser), options.index(d.winner)
|
|
_matrix[i, j] = {
|
|
"value": d.amount,
|
|
"winner": d.winner.get_abbr(j + 1),
|
|
"loser": d.loser.get_abbr(i + 1),
|
|
}
|
|
|
|
for i in range(n):
|
|
for j in range(n):
|
|
if _matrix[i, j]["value"] > _matrix[j, i]["value"]:
|
|
matrix[i, j] = (_matrix[i, j], "is-success")
|
|
elif _matrix[i, j]["value"] < _matrix[j, i]["value"]:
|
|
matrix[i, j] = (_matrix[i, j], "is-danger")
|
|
else:
|
|
matrix[i, j] = (_matrix[i, j], "")
|
|
|
|
matrix = zip(matrix.tolist(), options)
|
|
|
|
return render_to_string(
|
|
"elections/results/rank.html",
|
|
{"q": question, "matrix": matrix, "options": options},
|
|
)
|
|
|
|
|
|
class BallotsData:
|
|
"""Classe pour afficher les bulletins d'une question"""
|
|
|
|
@staticmethod
|
|
def select(question: "Question") -> str:
|
|
"""Renvoie un tableau affichant les options sélectionnées pour chaque bulletin"""
|
|
from .models import Vote
|
|
|
|
votes = Vote.objects.filter(option__question=question)
|
|
|
|
options = list(question.options.all())
|
|
|
|
ballots = {}
|
|
for v in votes:
|
|
ballot = ballots.get(v.pseudonymous_user, [False] * len(options))
|
|
ballot[options.index(v.option)] = True
|
|
|
|
ballots[v.pseudonymous_user] = ballot
|
|
|
|
return render_to_string(
|
|
"elections/ballots/select.html",
|
|
{"options": options, "ballots": sorted(ballots.values(), reverse=True)},
|
|
)
|
|
|
|
@staticmethod
|
|
def rank(question: "Question") -> str:
|
|
"""Renvoie un tableau contenant les classements des options par bulletin"""
|
|
from .models import Rank
|
|
|
|
options = list(question.options.all())
|
|
ranks = Rank.objects.select_related("vote").filter(
|
|
vote__option__in=options
|
|
)
|
|
ranks_by_user = {}
|
|
|
|
for r in ranks:
|
|
user = r.vote.pseudonymous_user
|
|
if user in ranks_by_user:
|
|
ranks_by_user[user].append(r.rank)
|
|
else:
|
|
ranks_by_user[user] = [r.rank]
|
|
|
|
return render_to_string(
|
|
"elections/ballots/rank.html",
|
|
{"options": options, "ballots": sorted(ranks_by_user.values())},
|
|
)
|
|
|
|
|
|
# #############################################################################
|
|
# Fonctions pour importer une liste de votant·e·s
|
|
# #############################################################################
|
|
|
|
|
|
def create_users(election: "Election", csv_file: File):
|
|
"""Crée les votant·e·s pour l'élection donnée, en remplissant les champs
|
|
`username`, `election` et `full_name`.
|
|
"""
|
|
User = get_user_model()
|
|
|
|
dialect = csv.Sniffer().sniff(csv_file.readline().decode("utf-8"))
|
|
csv_file.seek(0)
|
|
reader = csv.reader(io.StringIO(csv_file.read().decode("utf-8")), dialect)
|
|
|
|
users = [
|
|
User(
|
|
election=election,
|
|
username=f"{election.pk}__{username}",
|
|
email=email,
|
|
full_name=full_name,
|
|
)
|
|
for (username, full_name, email) in reader
|
|
]
|
|
|
|
User.objects.bulk_create(users)
|
|
|
|
|
|
def check_csv(csv_file: File):
|
|
"""Vérifie que le fichier donnant la liste de votant·e·s est bien formé"""
|
|
try:
|
|
dialect = csv.Sniffer().sniff(csv_file.readline().decode("utf-8"))
|
|
except csv.Error:
|
|
return [
|
|
_(
|
|
"Format invalide. Vérifiez que le fichier est bien formé (i.e. "
|
|
"chaque ligne de la forme 'login,nom,email')."
|
|
)
|
|
]
|
|
csv_file.seek(0)
|
|
reader = csv.reader(io.StringIO(csv_file.read().decode("utf-8")), dialect)
|
|
|
|
errors = []
|
|
users = {}
|
|
line_nb = 0
|
|
for line in reader:
|
|
line_nb += 1
|
|
if len(line) != 3:
|
|
errors.append(
|
|
_("La ligne {} n'a pas le bon nombre d'éléments.").format(line_nb)
|
|
)
|
|
else:
|
|
if line[0] == "":
|
|
errors.append(
|
|
_("Valeur manquante dans la ligne {} : 'login'.").format(line_nb)
|
|
)
|
|
else:
|
|
if line[0] in users:
|
|
errors.append(
|
|
_("Doublon dans les logins : lignes {} et {}.").format(
|
|
line_nb, users[line[0]]
|
|
)
|
|
)
|
|
else:
|
|
users[line[0]] = line_nb
|
|
if line[1] == "":
|
|
errors.append(
|
|
_("Valeur manquante dans la ligne {} : 'nom'.").format(line_nb)
|
|
)
|
|
try:
|
|
validate_email(line[2])
|
|
except ValidationError:
|
|
errors.append(
|
|
_("Adresse mail invalide à la ligne {} : '{}'.").format(
|
|
line_nb, line[2]
|
|
)
|
|
)
|
|
|
|
return errors
|
|
|
|
|
|
def send_mail(election: "Election", subject: str, body: str, reply_to: str) -> None:
|
|
"""Envoie le mail d'annonce de l'élection avec identifiants et mot de passe
|
|
aux votant·e·s, le mdp est généré en même temps que le mail est envoyé.
|
|
"""
|
|
|
|
# On n'envoie le mail qu'aux personnes qui n'en n'ont pas déjà reçu un
|
|
voters = list(election.registered_voters.exclude(has_valid_email=True))
|
|
e_url = reverse("election.view", args=[election.pk])
|
|
url = f"https://vote.eleves.ens.fr{e_url}"
|
|
start = election.start_date.strftime("%d/%m/%Y %H:%M %Z")
|
|
end = election.end_date.strftime("%d/%m/%Y %H:%M %Z")
|
|
messages = []
|
|
for v in voters:
|
|
password = generate_password()
|
|
v.password = make_password(password)
|
|
messages.append(
|
|
(
|
|
EmailMessage(
|
|
subject=subject,
|
|
body=body.format(
|
|
full_name=v.full_name,
|
|
election_url=url,
|
|
start=start,
|
|
end=end,
|
|
username=v.base_username,
|
|
password=password,
|
|
),
|
|
to=[v.email],
|
|
reply_to=[reply_to],
|
|
# On modifie l'adresse de retour d'erreur
|
|
headers={"From": "Kadenios <klub-dev@ens.fr>"},
|
|
),
|
|
v,
|
|
)
|
|
)
|
|
|
|
for m, v in messages:
|
|
try:
|
|
m.send()
|
|
v.has_valid_email = True
|
|
except smtplib.SMTPException:
|
|
v.has_valid_email = False
|
|
|
|
v.save()
|