kadenios/elections/utils.py

428 lines
14 KiB
Python

import csv
import io
import random
import networkx as nx
import numpy as np
from networkx.algorithms.dag import ancestors, descendants
from django.contrib.auth.hashers import make_password
from django.core.exceptions import ValidationError
from django.core.mail import EmailMessage, get_connection
from django.core.validators import validate_email
from django.template.loader import render_to_string
from django.urls import reverse
from django.utils.translation import gettext_lazy as _
# #############################################################################
# Fonctions universelles
# #############################################################################
def choices_length(choices):
"""Renvoie la longueur maximale des choix de choices"""
m = 0
for c in choices:
m = max(m, len(c[0]))
return m
# #############################################################################
# Classes pour différencier les différents types de questions
# #############################################################################
class CastFunctions:
"""Classe pour enregistrer les votes"""
def cast_select(user, vote_form):
"""On enregistre un vote classique"""
selected, n_selected = [], []
for v in vote_form:
if v.cleaned_data["selected"]:
selected.append(v.instance)
else:
n_selected.append(v.instance)
user.votes.add(*selected)
user.votes.remove(*n_selected)
def cast_rank(user, vote_form):
"""On enregistre un vote par classement"""
from .models import Rank, Vote
# On enregistre les votes pour pouvoir créér les classements
options = [v.instance for v in vote_form]
user.votes.add(*options)
votes = {
v.option: v for v in Vote.objects.filter(user=user, option__in=options)
}
ranks_create = []
ranks_update = []
for v in vote_form:
vote = votes[v.instance]
if hasattr(vote, "rank"):
vote.rank.rank = v.cleaned_data["rank"]
ranks_update.append(vote.rank)
else:
ranks_create.append(Rank(vote=vote, rank=v.cleaned_data["rank"]))
Rank.objects.bulk_update(ranks_update, ["rank"])
Rank.objects.bulk_create(ranks_create)
class TallyFunctions:
"""Classe pour gérer les dépouillements"""
def tally_select(question):
"""On dépouille un vote classique"""
from .models import Option
max_votes = 0
options = []
for o in question.options.prefetch_related("voters").all():
o.nb_votes = o.voters.count()
max_votes = max(max_votes, o.nb_votes)
options.append(o)
for o in options:
o.winner = o.nb_votes == max_votes
question.max_votes = max_votes
question.save()
Option.objects.bulk_update(options, ["nb_votes", "winner"])
def tally_schultze(question):
"""On dépouille un vote par classement et on crée la matrice des duels"""
from .models import Duel, Option, Rank
options = list(question.options.all())
nb_options = len(options)
ranks = Rank.objects.select_related("vote").filter(vote__option__in=options)
ranks_by_user = {}
for r in ranks:
user = r.vote.user
if user in ranks_by_user:
ranks_by_user[user].append(r)
else:
ranks_by_user[user] = [r]
ballots = []
# Pour chaque votant·e, on regarde son classement
for user in ranks_by_user:
votes = ranks_by_user[user]
ballot = np.zeros((nb_options, nb_options))
for i in range(nb_options):
for j in range(i):
ballot[i, j] = int(votes[i].rank < votes[j].rank)
ballot[j, i] = int(votes[j].rank < votes[i].rank)
ballots.append(ballot)
if ballots != []:
# On additionne les classements de tout le monde pour avoir la matrice
# des duels
duels = sum(ballots)
# Configuration du graphe
graph = nx.DiGraph()
graph.add_nodes_from(options)
cells = []
# On enregistre la matrice
for i in range(nb_options):
for j in range(nb_options):
if duels[i, j] > duels[j, i]:
graph.add_edge(
options[i], options[j], weight=(duels[i, j] - duels[j, i])
)
if duels[i, j] > 0:
cells.append(
Duel(
question=question,
winner=options[i],
loser=options[j],
amount=duels[i, j],
)
)
Duel.objects.bulk_create(cells)
# On utilise la méthode de Schwartz pour trouver les options gagnantes
while graph.edges():
losers = set() # Les options qui se font battre strictement
for n in graph.nodes():
losers |= set(descendants(graph, n)) - set(ancestors(graph, n))
if losers:
# On supprime les options perdantes
for n in losers:
graph.remove_node(n)
else:
# On n'a pas d'options perdantes, on supprime les arêtes de poids
# le plus faible
min_weight = min(nx.get_edge_attributes(graph, "weight").values())
min_edges = []
for (u, v) in graph.edges():
if graph[u][v]["weight"] == min_weight:
min_edges.append((u, v))
for (u, v) in min_edges:
graph.remove_edge(u, v)
# Les options gagnantes sont celles encore présentes dans le graphe
winners = graph.nodes()
for o in options:
o.winner = o in winners
Option.objects.bulk_update(options, ["winner"])
class ValidateFunctions:
"""Classe pour valider les formsets selon le type de question"""
def always_true(vote_form):
"""Retourne True pour les votes sans validation particulière"""
return True
def unique_selected(vote_form):
"""Vérifie qu'une seule option est choisie"""
nb_selected = 0
for v in vote_form:
nb_selected += v.cleaned_data["selected"]
if nb_selected == 0:
vote_form._non_form_errors.append(
ValidationError(_("Vous devez sélectionnner une option."))
)
return False
elif nb_selected > 1:
vote_form._non_form_errors.append(
ValidationError(_("Vous ne pouvez pas sélectionner plus d'une option."))
)
return False
return True
def limit_ranks(vote_form):
"""Limite le classement au nombre d'options"""
nb_options = len(vote_form)
valid = True
for v in vote_form:
rank = v.cleaned_data["rank"]
if rank is None:
rank = nb_options
if rank > nb_options:
v.add_error(
"rank", _("Le classement maximal est {}.").format(nb_options)
)
valid = False
elif rank < 1:
v.add_error("rank", _("Le classement minimal est 1."))
valid = False
v.cleaned_data["rank"] = rank # On ajoute le défaut
return valid
class ResultsData:
"""Classe pour afficher des informations supplémentaires après la fin d'une élection"""
def select(question):
"""On renvoie l'explication des couleurs"""
return render_to_string("elections/results/select.html")
def rank(question):
"""On récupère la matrice des résultats et on l'affiche"""
duels = question.duels.all()
options = list(question.options.all())
n = len(options)
_matrix = np.zeros((n, n), dtype=int)
matrix = np.zeros((n, n), dtype=tuple)
for d in duels:
i, j = options.index(d.loser), options.index(d.winner)
_matrix[i, j] = d.amount
for i in range(n):
for j in range(n):
if _matrix[i, j] > _matrix[j, i]:
matrix[i, j] = (_matrix[i, j], "is-success")
elif _matrix[i, j] < _matrix[j, i]:
matrix[i, j] = (_matrix[i, j], "is-danger")
else:
matrix[i, j] = (_matrix[i, j], "")
matrix = zip(matrix.tolist(), options)
return render_to_string(
"elections/results/rank.html",
{"q": question, "matrix": matrix, "options": options},
)
class BallotsData:
"""Classe pour afficher les bulletins d'une question"""
def select(question):
"""Renvoie un tableau affichant les options sélectionnées pour chaque bulletin"""
from .models import Vote
votes = Vote.objects.filter(option__question=question).select_related("user")
options = list(question.options.all())
ballots = {}
for v in votes:
ballot = ballots.get(v.user, [False] * len(options))
ballot[options.index(v.option)] = True
ballots[v.user] = ballot
return render_to_string(
"elections/ballots/select.html", {"options": options, "ballots": ballots}
)
def rank(question):
"""Renvoie un tableau contenant les classements des options par bulletin"""
from .models import Rank
options = list(question.options.all())
ranks = Rank.objects.select_related("vote__user").filter(
vote__option__in=options
)
ranks_by_user = {}
for r in ranks:
user = r.vote.user
if user in ranks_by_user:
ranks_by_user[user].append(r.rank)
else:
ranks_by_user[user] = [r.rank]
return render_to_string(
"elections/ballots/rank.html",
{"options": options, "ballots": ranks_by_user},
)
# #############################################################################
# Fonctions pour importer une liste de votant·e·s
# #############################################################################
def create_users(election, csv_file):
"""Crée les votant·e·s pour l'élection donnée, en remplissant les champs
`username`, `election` et `full_name`.
"""
dialect = csv.Sniffer().sniff(csv_file.readline().decode("utf-8"))
csv_file.seek(0)
reader = csv.reader(io.StringIO(csv_file.read().decode("utf-8")), dialect)
for (username, full_name, email) in reader:
election.registered_voters.create(
username=f"{election.id}__{username}", email=email, full_name=full_name
)
def check_csv(csv_file):
"""Vérifie que le fichier donnant la liste de votant·e·s est bien formé"""
try:
dialect = csv.Sniffer().sniff(csv_file.readline().decode("utf-8"))
except csv.Error:
return [
_(
"Format invalide. Vérifiez que le fichier est bien formé (i.e. "
"chaque ligne de la forme 'login,nom,email')."
)
]
csv_file.seek(0)
reader = csv.reader(io.StringIO(csv_file.read().decode("utf-8")), dialect)
errors = []
users = {}
line_nb = 0
for line in reader:
line_nb += 1
if len(line) != 3:
errors.append(
_("La ligne {} n'a pas le bon nombre d'éléments.").format(line_nb)
)
else:
if line[0] == "":
errors.append(
_("Valeur manquante dans la ligne {} : 'login'.").format(line_nb)
)
else:
if line[0] in users:
errors.append(
_("Doublon dans les logins : lignes {} et {}.").format(
line_nb, users[line[0]]
)
)
else:
users[line[0]] = line_nb
if line[1] == "":
errors.append(
_("Valeur manquante dans la ligne {} : 'nom'.").format(line_nb)
)
try:
validate_email(line[2])
except ValidationError:
errors.append(
_("Adresse mail invalide à la ligne {} : '{}'.").format(
line_nb, line[2]
)
)
return errors
def generate_password():
random.seed()
alphabet = "abcdefghjkmnopqrstuvwxyzABCDEFGHJKLMNPQRSTUVWXYZ23456789"
password = ""
for i in range(15):
password += random.choice(alphabet)
return password
def send_mail(election, mail_form):
"""Envoie le mail d'annonce de l'élection avec identifiants et mot de passe
aux votant·e·s, le mdp est généré en même temps que le mail est envoyé.
"""
from .models import User
voters = list(election.registered_voters.all())
e_url = reverse("election.view", args=[election.id])
url = f"https://vote.eleves.ens.fr/{e_url}"
messages = []
for v in voters:
password = generate_password()
v.password = make_password(password)
messages.append(
EmailMessage(
subject=mail_form.cleaned_data["objet"],
body=mail_form.cleaned_data["message"].format(
full_name=v.full_name,
election_url=url,
username=v.get_username(),
password=password,
),
to=[v.email],
)
)
get_connection(fail_silently=False).send_messages(messages)
User.objects.bulk_update(voters, ["password"])