import json from custommail.shortcuts import render_custom_mail from django.conf import settings from django.contrib import messages from django.contrib.auth.decorators import login_required from django.contrib.auth.models import User from django.core import mail from django.db import transaction from django.shortcuts import get_object_or_404, redirect, render from django.utils import timezone from django.views.decorators.csrf import csrf_exempt from django.views.generic import DetailView, ListView from gestioncof.decorators import buro_required from gestioncof.models import CofProfile from gestioncof.petits_cours_forms import DemandeForm, MatieresFormSet from gestioncof.petits_cours_models import ( PetitCoursAbility, PetitCoursAttribution, PetitCoursAttributionCounter, PetitCoursDemande, ) class DemandeListView(ListView): queryset = PetitCoursDemande.objects.prefetch_related("matieres").order_by( "traitee", "-id" ) template_name = "petits_cours_demandes_list.html" paginate_by = 20 class DemandeDetailView(DetailView): model = PetitCoursDemande queryset = PetitCoursDemande.objects.prefetch_related( "petitcoursattribution_set", "matieres" ) template_name = "gestioncof/details_demande_petit_cours.html" context_object_name = "demande" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) obj = self.object context["attributions"] = obj.petitcoursattribution_set.all() return context @buro_required def traitement(request, demande_id, redo=False): demande = get_object_or_404(PetitCoursDemande, id=demande_id) if demande.niveau == "other": return _traitement_other(request, demande, redo) if request.method == "POST": return _traitement_post(request, demande) proposals = {} proposed_for = {} unsatisfied = [] attribdata = {} for matiere, candidates in demande.get_candidates(redo): if candidates: tuples = [] for candidate in candidates: user = candidate.user tuples.append( ( candidate, PetitCoursAttributionCounter.get_uptodate(user, matiere), ) ) tuples = sorted(tuples, key=lambda c: c[1].count) candidates, _ = zip(*tuples) candidates = candidates[0 : min(3, len(candidates))] attribdata[matiere.id] = [] proposals[matiere] = [] for candidate in candidates: user = candidate.user proposals[matiere].append(user) attribdata[matiere.id].append(user.id) if user not in proposed_for: proposed_for[user] = [matiere] else: proposed_for[user].append(matiere) else: unsatisfied.append(matiere) return _finalize_traitement( request, demande, proposals, proposed_for, unsatisfied, attribdata, redo ) @buro_required def retraitement(request, demande_id): return traitement(request, demande_id, redo=True) def _finalize_traitement( request, demande, proposals, proposed_for, unsatisfied, attribdata, redo=False, errors=None, ): proposals = proposals.items() proposed_for = proposed_for.items() attribdata = list(attribdata.items()) proposed_mails = _generate_eleve_email(demande, proposed_for) mainmail = render_custom_mail( "petits-cours-mail-demandeur", { "proposals": proposals, "unsatisfied": unsatisfied, "extra": '", }, ) if errors is not None: for error in errors: messages.error(request, error) return render( request, "gestioncof/traitement_demande_petit_cours.html", { "demande": demande, "unsatisfied": unsatisfied, "proposals": proposals, "proposed_for": proposed_for, "proposed_mails": proposed_mails, "mainmail": mainmail, "attribdata": json.dumps(attribdata), "redo": redo, }, ) def _generate_eleve_email(demande, proposed_for): return [ ( user, render_custom_mail( "petit-cours-mail-eleve", {"demande": demande, "matieres": matieres} ), ) for user, matieres in proposed_for ] def _traitement_other_preparing(request, demande): redo = "redo" in request.POST unsatisfied = [] proposals = {} proposed_for = {} attribdata = {} errors = [] for matiere, candidates in demande.get_candidates(redo): if candidates: candidates = dict( [(candidate.user.id, candidate.user) for candidate in candidates] ) attribdata[matiere.id] = [] proposals[matiere] = [] for choice_id in range(min(3, len(candidates))): choice = int( request.POST["proposal-{:d}-{:d}".format(matiere.id, choice_id)] ) if choice == -1: continue if choice not in candidates: errors.append( "Choix invalide pour la proposition {:d}" "en {!s}".format(choice_id + 1, matiere) ) continue user = candidates[choice] if user in proposals[matiere]: errors.append( "La proposition {:d} en {!s} est un doublon".format( choice_id + 1, matiere ) ) continue proposals[matiere].append(user) attribdata[matiere.id].append(user.id) if user not in proposed_for: proposed_for[user] = [matiere] else: proposed_for[user].append(matiere) if not proposals[matiere]: errors.append("Aucune proposition pour {!s}".format(matiere)) elif len(proposals[matiere]) < 3: errors.append( "Seulement {:d} proposition{:s} pour {!s}".format( len(proposals[matiere]), "s" if len(proposals[matiere]) > 1 else "", matiere, ) ) else: unsatisfied.append(matiere) return _finalize_traitement( request, demande, proposals, proposed_for, unsatisfied, attribdata, errors=errors, ) def _traitement_other(request, demande, redo): if request.method == "POST": if "preparing" in request.POST: return _traitement_other_preparing(request, demande) else: return _traitement_post(request, demande) proposals = {} proposed_for = {} unsatisfied = [] attribdata = {} for matiere, candidates in demande.get_candidates(redo): if candidates: tuples = [] for candidate in candidates: user = candidate.user tuples.append( ( candidate, PetitCoursAttributionCounter.get_uptodate(user, matiere), ) ) tuples = sorted(tuples, key=lambda c: c[1].count) candidates, _ = zip(*tuples) attribdata[matiere.id] = [] proposals[matiere] = [] for candidate in candidates: user = candidate.user proposals[matiere].append(user) attribdata[matiere.id].append(user.id) if user not in proposed_for: proposed_for[user] = [matiere] else: proposed_for[user].append(matiere) else: unsatisfied.append(matiere) proposals = proposals.items() proposed_for = proposed_for.items() return render( request, "gestioncof/traitement_demande_petit_cours_autre_niveau.html", { "demande": demande, "unsatisfied": unsatisfied, "proposals": proposals, "proposed_for": proposed_for, }, ) def _traitement_post(request, demande): proposals = {} proposed_for = {} unsatisfied = [] extra = request.POST["extra"].strip() redo = "redo" in request.POST attribdata = request.POST["attribdata"] attribdata = dict(json.loads(attribdata)) for matiere in demande.matieres.all(): if matiere.id not in attribdata: unsatisfied.append(matiere) else: proposals[matiere] = [] for user_id in attribdata[matiere.id]: user = User.objects.get(pk=user_id) proposals[matiere].append(user) if user not in proposed_for: proposed_for[user] = [matiere] else: proposed_for[user].append(matiere) proposals_list = proposals.items() proposed_for = proposed_for.items() proposed_mails = _generate_eleve_email(demande, proposed_for) mainmail_object, mainmail_body = render_custom_mail( "petits-cours-mail-demandeur", {"proposals": proposals_list, "unsatisfied": unsatisfied, "extra": extra}, ) frommail = settings.MAIL_DATA["petits_cours"]["FROM"] bccaddress = settings.MAIL_DATA["petits_cours"]["BCC"] replyto = settings.MAIL_DATA["petits_cours"]["REPLYTO"] mails_to_send = [] for (user, (mail_object, body)) in proposed_mails: msg = mail.EmailMessage( mail_object, body, frommail, [user.email], [bccaddress], headers={"Reply-To": replyto}, ) mails_to_send.append(msg) mails_to_send.append( mail.EmailMessage( mainmail_object, mainmail_body, frommail, [demande.email], [bccaddress], headers={"Reply-To": replyto}, ) ) connection = mail.get_connection(fail_silently=False) connection.send_messages(mails_to_send) with transaction.atomic(): for matiere in proposals: for rank, user in enumerate(proposals[matiere]): # TODO(AD): Prefer PetitCoursAttributionCounter.get_uptodate() counter = PetitCoursAttributionCounter.objects.get( user=user, matiere=matiere ) counter.count += 1 counter.save() attrib = PetitCoursAttribution( user=user, matiere=matiere, demande=demande, rank=rank + 1 ) attrib.save() demande.traitee = True demande.traitee_par = request.user demande.processed = timezone.now() demande.save() return render( request, "gestioncof/traitement_demande_petit_cours_success.html", {"demande": demande, "redo": redo}, ) @login_required def inscription(request): profile, created = CofProfile.objects.get_or_create(user=request.user) if not profile.is_cof: return redirect("cof-denied") success = False if request.method == "POST": formset = MatieresFormSet(request.POST, instance=request.user) if formset.is_valid(): formset.save() profile.petits_cours_accept = "receive_proposals" in request.POST profile.petits_cours_remarques = request.POST["remarques"] profile.save() with transaction.atomic(): abilities = PetitCoursAbility.objects.filter(user=request.user).all() for ability in abilities: PetitCoursAttributionCounter.get_uptodate( ability.user, ability.matiere ) success = True formset = MatieresFormSet(instance=request.user) else: formset = MatieresFormSet(instance=request.user) return render( request, "inscription-petit-cours.html", { "formset": formset, "success": success, "receive_proposals": profile.petits_cours_accept, "remarques": profile.petits_cours_remarques, }, ) @csrf_exempt def demande(request): success = False if request.method == "POST": form = DemandeForm(request.POST) if form.is_valid(): form.save() success = True else: form = DemandeForm() return render( request, "demande-petit-cours.html", {"form": form, "success": success} ) @csrf_exempt def demande_raw(request): success = False if request.method == "POST": form = DemandeForm(request.POST) if form.is_valid(): form.save() success = True else: form = DemandeForm() return render( request, "demande-petit-cours-raw.html", {"form": form, "success": success} )