kpsul/kfet/views.py

2865 lines
96 KiB
Python

import heapq
import statistics
from collections import defaultdict
from datetime import datetime, timedelta
from decimal import Decimal
from typing import List, Tuple
from urllib.parse import urlencode
from django.conf import settings
from django.contrib import messages
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.mixins import PermissionRequiredMixin
from django.contrib.auth.models import Permission, User
from django.contrib.messages.views import SuccessMessageMixin
from django.core.exceptions import SuspiciousOperation
from django.core.mail import EmailMessage
from django.db import transaction
from django.db.models import (
Count,
DecimalField,
ExpressionWrapper,
F,
Max,
OuterRef,
Prefetch,
Q,
Subquery,
Sum,
)
from django.forms import ValidationError, formset_factory
from django.http import (
Http404,
HttpResponseBadRequest,
HttpResponseForbidden,
JsonResponse,
)
from django.shortcuts import get_object_or_404, redirect, render
from django.template import loader
from django.urls import reverse, reverse_lazy
from django.utils import timezone
from django.utils.decorators import method_decorator
from django.views.generic import DetailView, FormView, ListView, TemplateView
from django.views.generic.detail import BaseDetailView
from django.views.generic.edit import CreateView, DeleteView, UpdateView
from gestioncof.models import CofProfile
from kfet import KFET_DELETED_TRIGRAMME, consumers
from kfet.auth.decorators import kfet_password_auth
from kfet.autocomplete import kfet_account_only_autocomplete, kfet_autocomplete
from kfet.config import kfet_config
from kfet.decorators import teamkfet_required
from kfet.forms import (
AccountForm,
AccountFrozenForm,
AccountNoTriForm,
AccountPwdForm,
AccountStatForm,
AccountTriForm,
AddcostForm,
ArticleForm,
ArticleRestrictForm,
CategoryForm,
CheckoutForm,
CheckoutRestrictForm,
CheckoutStatementCreateForm,
CheckoutStatementUpdateForm,
CofForm,
ContactForm,
DemandeSoireeForm,
FilterHistoryForm,
InventoryArticleForm,
KFetConfigForm,
KPsulAccountForm,
KPsulCheckoutForm,
KPsulOperationFormSet,
KPsulOperationGroupForm,
OrderArticleForm,
OrderArticleToInventoryForm,
StatScaleForm,
TransferFormSet,
UserForm,
UserGroupForm,
UserInfoForm,
)
from kfet.models import (
Account,
AccountNegative,
Article,
ArticleCategory,
Checkout,
CheckoutStatement,
Inventory,
InventoryArticle,
Operation,
OperationGroup,
Order,
OrderArticle,
Supplier,
SupplierArticle,
Transfer,
TransferGroup,
)
from kfet.statistic import SCALE_DICT, DayScale, MonthScale, WeekScale, scale_url_params
from shared.views import AutocompleteView
from .auth import KFET_GENERIC_TRIGRAMME
from .auth.views import ( # noqa
AccountGroupCreate,
AccountGroupUpdate,
account_group,
login_generic,
)
def put_cleaned_data_in_dict(dict, form):
for field in form.cleaned_data:
dict[field] = form.cleaned_data[field]
class ContactView(FormView):
template_name = "kfet/contact.html"
form_class = ContactForm
success_url = reverse_lazy("kfet.contact")
def form_valid(self, form):
# Envoie un mail lorsque le formulaire est valide
EmailMessage(
form.cleaned_data["subject"],
form.cleaned_data["message"],
from_email=form.cleaned_data["from_email"],
to=("chefs-k-fet@ens.psl.eu",),
).send()
messages.success(
self.request,
"Votre message a bien été envoyé aux Wo·men K-Fêt.",
)
return super().form_valid(form)
class DemandeSoireeView(FormView):
template_name = "kfet/demande_soiree.html"
form_class = DemandeSoireeForm
success_url = reverse_lazy("kfet.demande-soiree")
def form_valid(self, form):
destinataires = ["chefs-k-fet@ens.psl.eu"]
if form.cleaned_data["contact_boum"]:
destinataires.append("boum@ens.psl.eu")
if form.cleaned_data["contact_pls"]:
destinataires.append("pls@ens.psl.eu")
# Envoie un mail lorsque le formulaire est valide
EmailMessage(
f"Demande de soirée le {form.cleaned_data['date']}",
loader.render_to_string(
"kfet/mails/demande_soiree.txt", context=form.cleaned_data
),
from_email=form.cleaned_data["from_email"],
to=destinataires,
cc=[form.cleaned_data["from_email"]],
).send()
messages.success(
self.request,
"Votre demande de soirée a bien été envoyée.",
)
return super().form_valid(form)
# -----
# Account views
# -----
# Account - General
@login_required
@teamkfet_required
def account(request):
accounts = Account.objects.select_related("cofprofile__user").order_by("trigramme")
return render(request, "kfet/account.html", {"accounts": accounts})
@login_required
@teamkfet_required
def account_is_validandfree_ajax(request):
if not request.GET.get("trigramme", ""):
raise Http404
trigramme = request.GET.get("trigramme")
data = Account.is_validandfree(trigramme)
return JsonResponse(data)
# Account - Create
@login_required
@teamkfet_required
@kfet_password_auth
def account_create(request):
# Enregistrement
if request.method == "POST":
trigramme_form = AccountTriForm(request.POST)
# Peuplement des forms
username = request.POST.get("username")
login_clipper = request.POST.get("login_clipper")
forms = get_account_create_forms(
request, username=username, login_clipper=login_clipper
)
account_form = forms["account_form"]
cof_form = forms["cof_form"]
user_form = forms["user_form"]
if all(
(
user_form.is_valid(),
cof_form.is_valid(),
trigramme_form.is_valid(),
account_form.is_valid(),
)
):
# Checking permission
if not request.user.has_perm("kfet.add_account"):
messages.error(
request, "Permission refusée", extra_tags="permission-denied"
)
else:
data = {}
# Fill data for Account.save()
put_cleaned_data_in_dict(data, user_form)
put_cleaned_data_in_dict(data, cof_form)
try:
account = trigramme_form.save(data=data)
account_form = AccountNoTriForm(request.POST, instance=account)
account_form.save()
messages.success(request, "Compte créé : %s" % account.trigramme)
return redirect("kfet.account.create")
except Account.UserHasAccount as e:
messages.error(
request,
"Cet utilisateur a déjà un compte K-Fêt : %s" % e.trigramme,
)
else:
initial = {"trigramme": request.GET.get("trigramme", "")}
trigramme_form = AccountTriForm(initial=initial)
account_form = None
cof_form = None
user_form = None
return render(
request,
"kfet/account_create.html",
{
"trigramme_form": trigramme_form,
"account_form": account_form,
"cof_form": cof_form,
"user_form": user_form,
},
)
def account_form_set_readonly_fields(user_form, cof_form):
user_form.fields["username"].widget.attrs["readonly"] = True
user_form.fields["first_name"].widget.attrs["readonly"] = True
user_form.fields["last_name"].widget.attrs["readonly"] = True
user_form.fields["email"].widget.attrs["readonly"] = True
cof_form.fields["login_clipper"].widget.attrs["readonly"] = True
cof_form.fields["departement"].widget.attrs["readonly"] = True
cof_form.fields["is_cof"].widget.attrs["disabled"] = True
def get_account_create_forms(
request=None, username=None, login_clipper=None, fullname=None
):
user = None
clipper = False
if login_clipper and (login_clipper == username or not username):
# à partir d'un clipper
# le user associé à ce clipper ne devrait pas encore exister
clipper = True
try:
# Vérification que clipper ne soit pas déjà dans User
user = User.objects.get(username=login_clipper)
# Ici, on nous a menti, le user existe déjà
username = user.username
clipper = False
except User.DoesNotExist:
# Clipper (sans user déjà existant)
# UserForm - Prefill
user_initial = {
"username": login_clipper,
"email": "%s@clipper.ens.fr" % login_clipper,
}
if fullname:
# Prefill du nom et prénom
names = fullname.split()
# Le premier, c'est le prénom
user_initial["first_name"] = names[0]
if len(names) > 1:
# Si d'autres noms -> tous dans le nom de famille
user_initial["last_name"] = " ".join(names[1:])
# CofForm - Prefill
cof_initial = {"login_clipper": login_clipper}
# Form créations
if request:
user_form = UserForm(request.POST, initial=user_initial)
cof_form = CofForm(request.POST, initial=cof_initial)
else:
user_form = UserForm(initial=user_initial)
cof_form = CofForm(initial=cof_initial)
# Protection (read-only) des champs username et login_clipper
account_form_set_readonly_fields(user_form, cof_form)
if username and not clipper:
try:
user = User.objects.get(username=username)
# le user existe déjà
# récupération du profil cof
(cof, _) = CofProfile.objects.get_or_create(user=user)
# UserForm + CofForm - Création à partir des instances existantes
if request:
user_form = UserForm(request.POST, instance=user)
cof_form = CofForm(request.POST, instance=cof)
else:
user_form = UserForm(instance=user)
cof_form = CofForm(instance=cof)
# Protection (read-only) des champs username, login_clipper et is_cof
account_form_set_readonly_fields(user_form, cof_form)
except User.DoesNotExist:
# le username donnée n'existe pas -> Création depuis rien
# (éventuellement en cours avec erreurs précédemment)
pass
if not user and not clipper:
# connaît pas du tout, faut tout remplir
if request:
user_form = UserForm(request.POST)
cof_form = CofForm(request.POST)
else:
user_form = UserForm()
cof_form = CofForm()
# mais on laisse le username en écriture
cof_form.fields["login_clipper"].widget.attrs["readonly"] = True
cof_form.fields["is_cof"].widget.attrs["disabled"] = True
if request:
account_form = AccountNoTriForm(request.POST)
else:
account_form = AccountNoTriForm()
return {"account_form": account_form, "cof_form": cof_form, "user_form": user_form}
@login_required
@teamkfet_required
def account_create_ajax(request, username=None, login_clipper=None, fullname=None):
forms = get_account_create_forms(
request=None, username=username, login_clipper=login_clipper, fullname=fullname
)
return render(
request,
"kfet/account_create_form.html",
{
"account_form": forms["account_form"],
"cof_form": forms["cof_form"],
"user_form": forms["user_form"],
},
)
# Account - Read
@login_required
def account_read(request, trigramme):
account = get_object_or_404(Account, trigramme=trigramme)
# Checking permissions
if not account.readable or (
not request.user.has_perm("kfet.is_team") and request.user != account.user
):
raise Http404
addcosts = (
OperationGroup.objects.filter(opes__addcost_for=account, opes__canceled_at=None)
.extra({"date": "date(at)"})
.values("date")
.annotate(sum_addcosts=Sum("opes__addcost_amount"))
.order_by("-date")
)
return render(
request, "kfet/account_read.html", {"account": account, "addcosts": addcosts}
)
# Account - Update
@teamkfet_required
@kfet_password_auth
def account_update(request, trigramme):
account = get_object_or_404(Account, trigramme=trigramme)
# Checking permissions
if not account.editable:
# Plus de leak de trigramme !
return HttpResponseForbidden
user_info_form = UserInfoForm(instance=account.user)
account_form = AccountForm(instance=account)
group_form = UserGroupForm(instance=account.user)
frozen_form = AccountFrozenForm(instance=account)
pwd_form = AccountPwdForm()
if request.method == "POST":
self_update = request.user == account.user
account_form = AccountForm(request.POST, instance=account)
group_form = UserGroupForm(request.POST, instance=account.user)
frozen_form = AccountFrozenForm(request.POST, instance=account)
pwd_form = AccountPwdForm(request.POST, account=account)
forms = []
warnings = []
if self_update or request.user.has_perm("kfet.change_account"):
forms.append(account_form)
elif account_form.has_changed():
warnings.append("compte")
if request.user.has_perm("kfet.manage_perms"):
forms.append(group_form)
forms.append(frozen_form)
elif group_form.has_changed():
warnings.append("statut d'équipe")
# Il ne faut pas valider `pwd_form` si elle est inchangée
if pwd_form.has_changed():
if self_update or request.user.has_perm("kfet.change_account_password"):
forms.append(pwd_form)
else:
warnings.append("mot de passe")
# Updating account info
if forms == []:
messages.error(
request,
"Informations non mises à jour : permission refusée",
extra_tags="permission-denied",
)
else:
if all(form.is_valid() for form in forms):
for form in forms:
form.save()
if len(warnings):
messages.warning(
request,
"Permissions insuffisantes pour modifier"
" les informations suivantes : {}.".format(", ".join(warnings)),
)
if self_update:
messages.success(request, "Vos informations ont été mises à jour !")
else:
messages.success(
request,
"Informations du compte %s mises à jour" % account.trigramme,
)
return redirect("kfet.account.read", account.trigramme)
else:
messages.error(
request, "Informations non mises à jour : corrigez les erreurs"
)
return render(
request,
"kfet/account_update.html",
{
"user_info_form": user_info_form,
"account": account,
"account_form": account_form,
"frozen_form": frozen_form,
"group_form": group_form,
"pwd_form": pwd_form,
},
)
# Account - Delete
class AccountDelete(PermissionRequiredMixin, DeleteView):
model = Account
slug_field = "trigramme"
slug_url_kwarg = "trigramme"
success_url = reverse_lazy("kfet.account")
success_message = "Compte supprimé avec succès !"
permission_required = "kfet.delete_account"
http_method_names = ["post"]
def delete(self, request, *args, **kwargs):
self.object = self.get_object()
if self.object.balance >= 0.01:
messages.error(
request,
"Impossible de supprimer un compte "
"avec une balance strictement positive !",
)
return redirect("kfet.account.read", self.object.trigramme)
if self.object.trigramme in [
"LIQ",
KFET_GENERIC_TRIGRAMME,
KFET_DELETED_TRIGRAMME,
"#13",
]:
messages.error(request, "Impossible de supprimer un trigramme protégé !")
return redirect("kfet.account.read", self.object.trigramme)
# SuccessMessageMixin does not work with DeleteView, see :
# https://code.djangoproject.com/ticket/21926
messages.success(request, self.success_message)
return super().delete(request, *args, **kwargs)
class AccountNegativeList(ListView):
queryset = (
AccountNegative.objects.select_related("account", "account__cofprofile__user")
.filter(account__balance__lt=0)
.exclude(account__trigramme="#13")
)
template_name = "kfet/account_negative.html"
context_object_name = "negatives"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
balances = (neg.account.balance for neg in self.object_list)
context["negatives_sum"] = sum(balances)
return context
# -----
# Checkout views
# -----
# Checkout - General
class CheckoutList(ListView):
model = Checkout
template_name = "kfet/checkout.html"
context_object_name = "checkouts"
# Checkout - Create
@method_decorator(kfet_password_auth, name="dispatch")
class CheckoutCreate(SuccessMessageMixin, CreateView):
model = Checkout
template_name = "kfet/checkout_create.html"
form_class = CheckoutForm
success_message = "Nouvelle caisse : %(name)s"
# Surcharge de la validation
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm("kfet.add_checkout"):
form.add_error(
None, ValidationError("Permission refusée", code="permission-denied")
)
return self.form_invalid(form)
# Creating
form.instance.created_by = self.request.user.profile.account_kfet
form.save()
return super().form_valid(form)
# Checkout - Read
class CheckoutRead(DetailView):
model = Checkout
template_name = "kfet/checkout_read.html"
context_object_name = "checkout"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["statements"] = context["checkout"].statements.order_by("-at")
return context
# Checkout - Update
@method_decorator(kfet_password_auth, name="dispatch")
class CheckoutUpdate(SuccessMessageMixin, UpdateView):
model = Checkout
template_name = "kfet/checkout_update.html"
form_class = CheckoutRestrictForm
success_message = "Informations mises à jour pour la caisse : %(name)s"
# Surcharge de la validation
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm("kfet.change_checkout"):
form.add_error(
None, ValidationError("Permission refusée", code="permission-denied")
)
return self.form_invalid(form)
# Updating
return super().form_valid(form)
# -----
# Checkout Statement views
# -----
# Checkout Statement - General
class CheckoutStatementList(ListView):
model = CheckoutStatement
queryset = CheckoutStatement.objects.order_by("-at")
template_name = "kfet/checkoutstatement.html"
context_object_name = "checkoutstatements"
# Checkout Statement - Create
def getAmountTaken(data):
return Decimal(
data.taken_001 * 0.01
+ data.taken_002 * 0.02
+ data.taken_005 * 0.05
+ data.taken_01 * 0.1
+ data.taken_02 * 0.2
+ data.taken_05 * 0.5
+ data.taken_1 * 1
+ data.taken_2 * 2
+ data.taken_5 * 5
+ data.taken_10 * 10
+ data.taken_20 * 20
+ data.taken_50 * 50
+ data.taken_100 * 100
+ data.taken_200 * 200
+ data.taken_500 * 500
+ float(data.taken_cheque)
)
def getAmountBalance(data):
return Decimal(
data["balance_001"] * 0.01
+ data["balance_002"] * 0.02
+ data["balance_005"] * 0.05
+ data["balance_01"] * 0.1
+ data["balance_02"] * 0.2
+ data["balance_05"] * 0.5
+ data["balance_1"] * 1
+ data["balance_2"] * 2
+ data["balance_5"] * 5
+ data["balance_10"] * 10
+ data["balance_20"] * 20
+ data["balance_50"] * 50
+ data["balance_100"] * 100
+ data["balance_200"] * 200
+ data["balance_500"] * 500
)
@method_decorator(kfet_password_auth, name="dispatch")
class CheckoutStatementCreate(SuccessMessageMixin, CreateView):
model = CheckoutStatement
template_name = "kfet/checkoutstatement_create.html"
form_class = CheckoutStatementCreateForm
success_message = "Nouveau relevé : %(checkout)s - %(at)s"
def get_success_url(self):
return reverse_lazy(
"kfet.checkout.read", kwargs={"pk": self.kwargs["pk_checkout"]}
)
def get_success_message(self, cleaned_data):
return self.success_message % dict(
cleaned_data, checkout=self.object.checkout.name, at=self.object.at
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
checkout = Checkout.objects.get(pk=self.kwargs["pk_checkout"])
context["checkout"] = checkout
return context
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm("kfet.add_checkoutstatement"):
form.add_error(
None, ValidationError("Permission refusée", code="permission-denied")
)
return self.form_invalid(form)
# Creating
form.instance.amount_taken = getAmountTaken(form.instance)
if not form.instance.not_count:
form.instance.balance_new = getAmountBalance(form.cleaned_data)
form.instance.checkout_id = self.kwargs["pk_checkout"]
form.instance.by = self.request.user.profile.account_kfet
return super().form_valid(form)
@method_decorator(kfet_password_auth, name="dispatch")
class CheckoutStatementUpdate(SuccessMessageMixin, UpdateView):
model = CheckoutStatement
template_name = "kfet/checkoutstatement_update.html"
form_class = CheckoutStatementUpdateForm
success_message = "Relevé modifié"
def get_success_url(self):
return reverse_lazy(
"kfet.checkout.read", kwargs={"pk": self.kwargs["pk_checkout"]}
)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
checkout = Checkout.objects.get(pk=self.kwargs["pk_checkout"])
context["checkout"] = checkout
return context
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm("kfet.change_checkoutstatement"):
form.add_error(
None, ValidationError("Permission refusée", code="permission-denied")
)
return self.form_invalid(form)
# Updating
form.instance.amount_taken = getAmountTaken(form.instance)
return super().form_valid(form)
# -----
# Category views
# -----
# Category - General
class CategoryList(ListView):
queryset = ArticleCategory.objects.prefetch_related("articles").order_by("name")
template_name = "kfet/category.html"
context_object_name = "categories"
# Category - Update
@method_decorator(kfet_password_auth, name="dispatch")
class CategoryUpdate(SuccessMessageMixin, UpdateView):
model = ArticleCategory
template_name = "kfet/category_update.html"
form_class = CategoryForm
success_url = reverse_lazy("kfet.category")
success_message = "Informations mises à jour pour la catégorie : %(name)s"
# Surcharge de la validation
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm("kfet.change_articlecategory"):
form.add_error(
None, ValidationError("Permission refusée", code="permission-denied")
)
return self.form_invalid(form)
# Updating
return super().form_valid(form)
# -----
# Article views
# -----
# Article - General
class ArticleList(ListView):
queryset = (
Article.objects.select_related("category")
.prefetch_related(
Prefetch(
"inventories",
queryset=Inventory.objects.order_by("-at"),
to_attr="inventory",
)
)
.order_by("category__name", "-is_sold", "name")
)
template_name = "kfet/article.html"
context_object_name = "articles"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
articles = context[self.context_object_name]
context["nb_articles"] = len(articles)
context[self.context_object_name] = articles.filter(is_sold=True)
context["not_sold_articles"] = articles.filter(is_sold=False)
return context
# Article - Create
@method_decorator(kfet_password_auth, name="dispatch")
class ArticleCreate(SuccessMessageMixin, CreateView):
model = Article
template_name = "kfet/article_create.html"
form_class = ArticleForm
success_message = "Nouvel item : %(category)s - %(name)s"
# Surcharge de la validation
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm("kfet.add_article"):
form.add_error(
None, ValidationError("Permission refusée", code="permission-denied")
)
return self.form_invalid(form)
# Save ici pour save le manytomany suppliers
article = form.save()
# Save des suppliers déjà existant
for supplier in form.cleaned_data["suppliers"]:
SupplierArticle.objects.create(article=article, supplier=supplier)
# Nouveau supplier
supplier_new = form.cleaned_data["supplier_new"].strip()
if supplier_new:
supplier, created = Supplier.objects.get_or_create(name=supplier_new)
if created:
SupplierArticle.objects.create(article=article, supplier=supplier)
# Inventaire avec stock initial
inventory = Inventory()
inventory.by = self.request.user.profile.account_kfet
inventory.save()
InventoryArticle.objects.create(
inventory=inventory,
article=article,
stock_old=article.stock,
stock_new=article.stock,
)
# Creating
return super().form_valid(form)
# Article - Read
class ArticleRead(DetailView):
model = Article
template_name = "kfet/article_read.html"
context_object_name = "article"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
inventoryarts = (
InventoryArticle.objects.filter(article=self.object)
.select_related("inventory")
.order_by("-inventory__at")
)
context["inventoryarts"] = inventoryarts
supplierarts = (
SupplierArticle.objects.filter(article=self.object)
.select_related("supplier")
.order_by("-at")
)
context["supplierarts"] = supplierarts
return context
# Article - Update
@method_decorator(kfet_password_auth, name="dispatch")
class ArticleUpdate(SuccessMessageMixin, UpdateView):
model = Article
template_name = "kfet/article_update.html"
form_class = ArticleRestrictForm
success_message = "Informations mises à jour pour l'article : %(name)s"
# Surcharge de la validation
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm("kfet.change_article"):
form.add_error(
None, ValidationError("Permission refusée", code="permission-denied")
)
return self.form_invalid(form)
# Save ici pour save le manytomany suppliers
article = form.save()
# Save des suppliers déjà existant
for supplier in form.cleaned_data["suppliers"]:
if supplier not in article.suppliers.all():
SupplierArticle.objects.create(article=article, supplier=supplier)
# On vire les suppliers désélectionnés
for supplier in article.suppliers.all():
if supplier not in form.cleaned_data["suppliers"]:
SupplierArticle.objects.filter(
article=article, supplier=supplier
).delete()
# Nouveau supplier
supplier_new = form.cleaned_data["supplier_new"].strip()
if supplier_new:
supplier, created = Supplier.objects.get_or_create(name=supplier_new)
if created:
SupplierArticle.objects.create(article=article, supplier=supplier)
# Updating
return super().form_valid(form)
class ArticleDelete(PermissionRequiredMixin, DeleteView):
model = Article
success_url = reverse_lazy("kfet.article")
success_message = "Article supprimé avec succès !"
permission_required = "kfet.delete_article"
def get(self, request, *args, **kwargs):
return redirect("kfet.article.read", self.kwargs.get(self.pk_url_kwarg))
def delete(self, request, *args, **kwargs):
messages.success(request, self.success_message)
return super().delete(request, *args, **kwargs)
# -----
# K-Psul
# -----
@teamkfet_required
def kpsul(request):
data = {}
data["operationgroup_form"] = KPsulOperationGroupForm()
data["trigramme_form"] = KPsulAccountForm()
data["checkout_form"] = KPsulCheckoutForm()
data["operation_formset"] = KPsulOperationFormSet(queryset=Operation.objects.none())
return render(request, "kfet/kpsul.html", data)
@teamkfet_required
def kpsul_get_settings(request):
addcost_for = kfet_config.addcost_for
data = {
"subvention_cof": kfet_config.subvention_cof,
"addcost_for": addcost_for and addcost_for.trigramme or "",
"addcost_amount": kfet_config.addcost_amount,
}
return JsonResponse(data)
@teamkfet_required
def account_read_json(request, trigramme):
account = get_object_or_404(Account, trigramme=trigramme)
if not account.readable:
raise Http404
data = {
"id": account.pk,
"name": account.name,
"email": account.email,
"is_cof": account.is_cof,
"promo": account.promo,
"balance": account.balance,
"is_frozen": account.is_frozen,
"departement": account.departement,
"nickname": account.nickname,
"trigramme": account.trigramme,
}
return JsonResponse(data)
@teamkfet_required
def kpsul_checkout_data(request):
pk = request.POST.get("pk", 0)
if not pk:
pk = 0
data = (
Checkout.objects.annotate(
last_statement_by_first_name=F(
"statements__by__cofprofile__user__first_name"
),
last_statement_by_last_name=F(
"statements__by__cofprofile__user__last_name"
),
last_statement_by_trigramme=F("statements__by__trigramme"),
last_statement_balance=F("statements__balance_new"),
last_statement_at=F("statements__at"),
)
.select_related(
"statements" "statements__by", "statements__by__cofprofile__user"
)
.filter(pk=pk)
.order_by("statements__at")
.values(
"id",
"name",
"balance",
"valid_from",
"valid_to",
"last_statement_balance",
"last_statement_at",
"last_statement_by_trigramme",
"last_statement_by_last_name",
"last_statement_by_first_name",
)
.last()
)
if data is None:
raise Http404
return JsonResponse(data)
@teamkfet_required
@kfet_password_auth
def kpsul_update_addcost(request):
addcost_form = AddcostForm(request.POST)
data = {"errors": []}
if not addcost_form.is_valid():
for field, errors in addcost_form.errors.items():
for error in errors:
data["errors"].append({"code": f"invalid_{field}", "message": error})
return JsonResponse(data, status=400)
required_perms = ["kfet.manage_addcosts"]
if not request.user.has_perms(required_perms):
data["missing_perms"] = get_missing_perms(required_perms, request.user)
return JsonResponse(data, status=403)
trigramme = addcost_form.cleaned_data["trigramme"]
account = trigramme and Account.objects.get(trigramme=trigramme) or None
amount = addcost_form.cleaned_data["amount"]
kfet_config.set(addcost_for=account, addcost_amount=amount)
data = {"addcost": {"for": account and account.trigramme or None, "amount": amount}}
consumers.KPsul.group_send("kfet.kpsul", data)
return JsonResponse(data)
def get_missing_perms(required_perms: List[str], user: User) -> List[str]:
def get_perm_name(app_label: str, codename: str) -> str:
return Permission.objects.values_list("name", flat=True).get(
codename=codename, content_type__app_label=app_label
)
missing_perms = [
get_perm_name(*perm.split("."))
for perm in required_perms
if not user.has_perm(perm)
]
return missing_perms
@teamkfet_required
@kfet_password_auth
def kpsul_perform_operations(request):
# Initializing response data
data = {"errors": []}
# Checking operationgroup
operationgroup_form = KPsulOperationGroupForm(request.POST)
if not operationgroup_form.is_valid():
for field in operationgroup_form.errors:
verbose_field, feminin = (
("compte", "") if field == "on_acc" else ("caisse", "e")
)
data["errors"].append(
{
"code": f"invalid_{field}",
"message": f"Pas de {verbose_field} sélectionné{feminin}",
}
)
# Checking operation_formset
operation_formset = KPsulOperationFormSet(request.POST)
if not operation_formset.is_valid():
data["errors"].append(
{
"code": "invalid_formset",
"message": "Formulaire d'opérations vide ou invalide",
}
)
# Returning BAD REQUEST if errors
if data["errors"]:
return JsonResponse(data, status=400)
# Pre-saving (no commit)
operationgroup = operationgroup_form.save(commit=False)
operations = operation_formset.save(commit=False)
on_acc = operationgroup.on_acc
# Retrieving COF grant
cof_grant = kfet_config.subvention_cof
# Retrieving addcosts data
addcost_amount = kfet_config.addcost_amount
addcost_for = kfet_config.addcost_for
# Initializing vars
required_perms = set() # Required perms to perform all operations
cof_grant_divisor = 1 + cof_grant / 100
to_addcost_for_balance = 0 # For balance of addcost_for
to_checkout_balance = 0 # For balance of selected checkout
to_articles_stocks = defaultdict(lambda: 0) # For stocks articles
is_addcost = all((addcost_for, addcost_amount, addcost_for != on_acc))
need_comment = on_acc.need_comment
if on_acc.is_frozen:
data["errors"].append(
{"code": "frozen_acc", "message": f"Le compte {on_acc.trigramme} est gelé"}
)
# Filling data of each operations
# + operationgroup + calculating other stuffs
for operation in operations:
if operation.type == Operation.PURCHASE:
operation.amount = -operation.article.price * operation.article_nb
if is_addcost & operation.article.category.has_addcost:
operation.addcost_for = addcost_for
operation.addcost_amount = addcost_amount * operation.article_nb
operation.amount -= operation.addcost_amount
to_addcost_for_balance += operation.addcost_amount
if on_acc.is_cash:
to_checkout_balance += -operation.amount
if on_acc.is_cof and operation.article.category.has_reduction:
if is_addcost and operation.article.category.has_addcost:
operation.addcost_amount /= cof_grant_divisor
operation.amount = operation.amount / cof_grant_divisor
to_articles_stocks[operation.article] -= operation.article_nb
else:
if on_acc.is_cash:
data["errors"].append(
{
"code": "invalid_liq",
"message": (
"Impossible de compter autre chose que des achats sur LIQ"
),
}
)
if operation.type != Operation.EDIT:
to_checkout_balance += operation.amount
operationgroup.amount += operation.amount
if operation.type == Operation.DEPOSIT:
required_perms.add("kfet.perform_deposit")
if operation.type == Operation.EDIT:
required_perms.add("kfet.edit_balance_account")
need_comment = True
if on_acc.is_cof:
to_addcost_for_balance = to_addcost_for_balance / cof_grant_divisor
(perms, stop) = on_acc.perms_to_perform_operation(amount=operationgroup.amount)
required_perms |= perms
if stop:
data["errors"].append(
{
"code": "negative",
"message": f"Le compte {on_acc.trigramme} a un solde insuffisant.",
}
)
if need_comment:
operationgroup.comment = operationgroup.comment.strip()
if not operationgroup.comment:
data["need_comment"] = True
if data["errors"] or "need_comment" in data:
return JsonResponse(data, status=400)
if not request.user.has_perms(required_perms):
data["missing_perms"] = get_missing_perms(required_perms, request.user)
return JsonResponse(data, status=403)
# If 1 perm is required, filling who perform the operations
if required_perms:
operationgroup.valid_by = request.user.profile.account_kfet
# Filling cof status for statistics
operationgroup.is_cof = on_acc.is_cof
# Starting transaction to ensure data consistency
with transaction.atomic():
# If not cash account,
# saving account's balance and adding to Negative if not in
if not on_acc.is_cash:
(
Account.objects.filter(pk=on_acc.pk).update(
balance=F("balance") + operationgroup.amount
)
)
on_acc.refresh_from_db()
on_acc.update_negative()
# Updating checkout's balance
if to_checkout_balance:
Checkout.objects.filter(pk=operationgroup.checkout.pk).update(
balance=F("balance") + to_checkout_balance
)
# Saving addcost_for with new balance if there is one
if is_addcost and to_addcost_for_balance:
Account.objects.filter(pk=addcost_for.pk).update(
balance=F("balance") + to_addcost_for_balance
)
# Saving operation group
operationgroup.save()
# Filling operationgroup id for each operations and saving
for operation in operations:
operation.group = operationgroup
operation.save()
# Updating articles stock
for article in to_articles_stocks:
Article.objects.filter(pk=article.pk).update(
stock=F("stock") + to_articles_stocks[article]
)
# Websocket data
websocket_data = {}
websocket_data["groups"] = [
{
"add": True,
"type": "operation",
"id": operationgroup.pk,
"amount": operationgroup.amount,
"checkout__name": operationgroup.checkout.name,
"at": operationgroup.at,
"is_cof": operationgroup.is_cof,
"comment": operationgroup.comment,
"valid_by__trigramme": (
operationgroup.valid_by and operationgroup.valid_by.trigramme or None
),
"on_acc__trigramme": on_acc.trigramme,
"entries": [],
}
]
for operation in operations:
ope_data = {
"id": operation.pk,
"type": operation.type,
"amount": operation.amount,
"addcost_amount": operation.addcost_amount,
"addcost_for__trigramme": (
operation.addcost_for and addcost_for.trigramme or None
),
"article__name": (operation.article and operation.article.name or None),
"article_nb": operation.article_nb,
"group_id": operationgroup.pk,
"canceled_by__trigramme": None,
"canceled_at": None,
}
websocket_data["groups"][0]["entries"].append(ope_data)
# Need refresh from db cause we used update on queryset
operationgroup.checkout.refresh_from_db()
websocket_data["checkouts"] = [
{"id": operationgroup.checkout.pk, "balance": operationgroup.checkout.balance}
]
websocket_data["articles"] = []
# Need refresh from db cause we used update on querysets
articles_pk = [article.pk for article in to_articles_stocks]
articles = Article.objects.values("id", "stock").filter(pk__in=articles_pk)
for article in articles:
websocket_data["articles"].append(
{"id": article["id"], "stock": article["stock"]}
)
consumers.KPsul.group_send("kfet.kpsul", websocket_data)
return JsonResponse(data)
@teamkfet_required
@kfet_password_auth
def cancel_operations(request):
# Pour la réponse
data = {"canceled": [], "warnings": {}, "errors": []}
# Checking if BAD REQUEST (opes_pk not int or not existing)
try:
# Set pour virer les doublons
opes_post = set(
map(int, filter(None, request.POST.getlist("operations[]", [])))
)
except ValueError:
data["errors"].append(
{"code": "invalid_request", "message": "Requête invalide !"}
)
return JsonResponse(data, status=400)
opes_all = Operation.objects.select_related(
"group", "group__on_acc", "group__on_acc__negative"
).filter(pk__in=opes_post)
opes_pk = [ope.pk for ope in opes_all]
opes_notexisting = [ope for ope in opes_post if ope not in opes_pk]
if opes_notexisting:
data["errors"].append(
{
"code": "cancel_missing",
"message": "Opérations inexistantes : {}".format(
", ".join(map(str, opes_notexisting))
),
}
)
return JsonResponse(data, status=400)
opes_already_canceled = [] # Déjà annulée
opes = [] # Pas déjà annulée
required_perms = set()
cancel_duration = kfet_config.cancel_duration
# Modifs à faire sur les balances des comptes
to_accounts_balances = defaultdict(int)
# ------ sur les montants des groupes d'opé
to_groups_amounts = defaultdict(int)
# ------ sur les balances de caisses
to_checkouts_balances = defaultdict(int)
# ------ sur les stocks d'articles
to_articles_stocks = defaultdict(int)
for ope in opes_all:
if ope.canceled_at:
# Opération déjà annulée, va pour un warning en Response
opes_already_canceled.append(ope.pk)
else:
opes.append(ope.pk)
# Si opé il y a plus de CANCEL_DURATION, permission requise
if ope.group.at + cancel_duration < timezone.now():
required_perms.add("kfet.cancel_old_operations")
# Calcul de toutes modifs à faire en cas de validation
# Pour les balances de comptes
if not ope.group.on_acc.is_cash:
to_accounts_balances[ope.group.on_acc] -= ope.amount
if ope.addcost_for and ope.addcost_amount:
to_accounts_balances[ope.addcost_for] -= ope.addcost_amount
# Pour les groupes d'opés
to_groups_amounts[ope.group] -= ope.amount
# Pour les balances de caisses
# Les balances de caisses dont il y a eu un relevé depuis la date
# de la commande ne doivent pas être modifiées
# TODO : Prendre en compte le dernier relevé où la caisse a été
# comptée et donc modifier les balance_old (et amount_error)
# des relevés suivants.
# Note : Dans le cas où un CheckoutStatement est mis à jour
# par `.save()`, amount_error est recalculé automatiquement,
# ce qui n'est pas le cas en faisant un update sur queryset
# TODO ? : Maj les balance_old de relevés pour modifier l'erreur
last_statement = (
CheckoutStatement.objects.filter(checkout=ope.group.checkout)
.order_by("at")
.last()
)
if not last_statement or last_statement.at < ope.group.at:
if ope.is_checkout:
if ope.group.on_acc.is_cash:
to_checkouts_balances[ope.group.checkout] -= -ope.amount
else:
to_checkouts_balances[ope.group.checkout] -= ope.amount
# Pour les stocks d'articles
# Les stocks d'articles dont il y a eu un inventaire depuis la date
# de la commande ne doivent pas être modifiés
# TODO : Prendre en compte le dernier inventaire où le stock a bien
# été compté (pas dans le cas d'une livraison).
# Note : si InventoryArticle est maj par .save(), stock_error
# est recalculé automatiquement
if ope.article and ope.article_nb:
last_stock = (
InventoryArticle.objects.select_related("inventory")
.filter(article=ope.article)
.order_by("inventory__at")
.last()
)
if not last_stock or last_stock.inventory.at < ope.group.at:
to_articles_stocks[ope.article] += ope.article_nb
if not opes:
data["warnings"]["already_canceled"] = opes_already_canceled
return JsonResponse(data)
negative_accounts = []
# Checking permissions or stop
for account in to_accounts_balances:
(perms, stop) = account.perms_to_perform_operation(
amount=to_accounts_balances[account]
)
required_perms |= perms
if stop:
negative_accounts.append(account.trigramme)
if negative_accounts:
data["errors"].append(
{
"code": "negative",
"message": "Solde insuffisant pour les comptes suivants : {}".format(
", ".join(negative_accounts)
),
}
)
return JsonResponse(data, status=400)
if not request.user.has_perms(required_perms):
data["missing_perms"] = get_missing_perms(required_perms, request.user)
return JsonResponse(data, status=403)
canceled_by = required_perms and request.user.profile.account_kfet or None
canceled_at = timezone.now()
with transaction.atomic():
(
Operation.objects.filter(pk__in=opes).update(
canceled_by=canceled_by, canceled_at=canceled_at
)
)
for account in to_accounts_balances:
(
Account.objects.filter(pk=account.pk).update(
balance=F("balance") + to_accounts_balances[account]
)
)
if not account.is_cash:
# Should always be true, but we want to be sure
account.refresh_from_db()
account.update_negative()
for checkout in to_checkouts_balances:
Checkout.objects.filter(pk=checkout.pk).update(
balance=F("balance") + to_checkouts_balances[checkout]
)
for group in to_groups_amounts:
OperationGroup.objects.filter(pk=group.pk).update(
amount=F("amount") + to_groups_amounts[group]
)
for article in to_articles_stocks:
Article.objects.filter(pk=article.pk).update(
stock=F("stock") + to_articles_stocks[article]
)
# Need refresh from db cause we used update on querysets.
# Sort objects by pk to get deterministic responses.
opegroups_pk = [opegroup.pk for opegroup in to_groups_amounts]
opegroups = (
OperationGroup.objects.values("id", "amount", "is_cof")
.filter(pk__in=opegroups_pk)
.order_by("pk")
)
opes = (
Operation.objects.values("id", "canceled_at", "canceled_by__trigramme")
.filter(pk__in=opes)
.order_by("pk")
)
checkouts_pk = [checkout.pk for checkout in to_checkouts_balances]
checkouts = (
Checkout.objects.values("id", "balance")
.filter(pk__in=checkouts_pk)
.order_by("pk")
)
articles_pk = [article.pk for articles in to_articles_stocks]
articles = Article.objects.values("id", "stock").filter(pk__in=articles_pk)
# Websocket data
websocket_data = {"checkouts": [], "articles": []}
for checkout in checkouts:
websocket_data["checkouts"].append(
{"id": checkout["id"], "balance": checkout["balance"]}
)
for article in articles:
websocket_data["articles"].append(
{"id": article["id"], "stock": article["stock"]}
)
consumers.KPsul.group_send("kfet.kpsul", websocket_data)
data["canceled"] = list(opes)
data["opegroups_to_update"] = list(opegroups)
if opes_already_canceled:
data["warnings"]["already_canceled"] = opes_already_canceled
return JsonResponse(data)
def get_history_limit(user) -> Tuple[datetime, datetime]:
"""returns a tuple of 2 dates
- the earliest date the given user can view history of any account
- the earliest date the given user can view history of special accounts
(LIQ and #13)"""
now = timezone.now()
if user.has_perm("kfet.access_old_history"):
return (
now - settings.KFET_HISTORY_LONG_DATE_LIMIT,
settings.KFET_HISTORY_NO_DATE_LIMIT,
)
if user.has_perm("kfet.is_team"):
limit = now - settings.KFET_HISTORY_DATE_LIMIT
return limit, limit
# should not happen - future earliest date
future = now + timedelta(days=1)
return future, future
@login_required
def history_json(request):
# Récupération des paramètres
form = FilterHistoryForm(request.GET)
if not form.is_valid():
return HttpResponseBadRequest()
start = form.cleaned_data["start"]
end = form.cleaned_data["end"]
account = form.cleaned_data["account"]
checkout = form.cleaned_data["checkout"]
transfers_only = form.cleaned_data["transfers_only"]
opes_only = form.cleaned_data["opes_only"]
# Construction de la requête (sur les transferts) pour le prefetch
transfer_queryset_prefetch = Transfer.objects.select_related(
"from_acc", "to_acc", "canceled_by"
)
# Le check sur les comptes est dans le prefetch pour les transferts
if account:
transfer_queryset_prefetch = transfer_queryset_prefetch.filter(
Q(from_acc=account) | Q(to_acc=account)
)
if not request.user.has_perm("kfet.is_team"):
try:
acc = request.user.profile.account_kfet
transfer_queryset_prefetch = transfer_queryset_prefetch.filter(
Q(from_acc=acc) | Q(to_acc=acc)
)
except Account.DoesNotExist:
return JsonResponse({}, status=403)
transfer_prefetch = Prefetch(
"transfers", queryset=transfer_queryset_prefetch, to_attr="filtered_transfers"
)
# Construction de la requête (sur les opérations) pour le prefetch
ope_queryset_prefetch = Operation.objects.select_related(
"article", "canceled_by", "addcost_for"
)
ope_prefetch = Prefetch("opes", queryset=ope_queryset_prefetch)
# Construction de la requête principale
opegroups = (
OperationGroup.objects.prefetch_related(ope_prefetch)
.select_related("on_acc", "valid_by")
.order_by("at")
)
transfergroups = (
TransferGroup.objects.prefetch_related(transfer_prefetch)
.select_related("valid_by")
.order_by("at")
)
# limite l'accès à l'historique plus vieux que settings.KFET_HISTORY_DATE_LIMIT
limit_date = True
# Application des filtres
if start:
opegroups = opegroups.filter(at__gte=start)
transfergroups = transfergroups.filter(at__gte=start)
if end:
opegroups = opegroups.filter(at__lt=end)
transfergroups = transfergroups.filter(at__lt=end)
if checkout:
opegroups = opegroups.filter(checkout=checkout)
transfergroups = TransferGroup.objects.none()
if transfers_only:
opegroups = OperationGroup.objects.none()
if opes_only:
transfergroups = TransferGroup.objects.none()
if account:
opegroups = opegroups.filter(on_acc=account)
if account.user == request.user:
limit_date = False # pas de limite de date sur son propre historique
# Un non-membre de l'équipe n'a que accès à son historique
elif not request.user.has_perm("kfet.is_team"):
# un non membre de la kfet doit avoir le champ account
# pré-rempli, cette requête est douteuse
return JsonResponse({}, status=403)
if limit_date:
# limiter l'accès à l'historique ancien pour confidentialité
earliest_date, earliest_date_no_limit = get_history_limit(request.user)
if (
account
and account.trigramme in settings.KFET_HISTORY_NO_DATE_LIMIT_TRIGRAMMES
):
earliest_date = earliest_date_no_limit
opegroups = opegroups.filter(at__gte=earliest_date)
transfergroups = transfergroups.filter(at__gte=earliest_date)
# Construction de la réponse
history_groups = []
for opegroup in opegroups:
opegroup_dict = {
"type": "operation",
"id": opegroup.id,
"amount": opegroup.amount,
"at": opegroup.at,
"checkout_id": opegroup.checkout_id,
"is_cof": opegroup.is_cof,
"comment": opegroup.comment,
"entries": [],
"on_acc__trigramme": opegroup.on_acc and opegroup.on_acc.trigramme or None,
}
if request.user.has_perm("kfet.is_team"):
opegroup_dict["valid_by__trigramme"] = (
opegroup.valid_by and opegroup.valid_by.trigramme or None
)
for ope in opegroup.opes.all():
ope_dict = {
"id": ope.id,
"type": ope.type,
"amount": ope.amount,
"article_nb": ope.article_nb,
"addcost_amount": ope.addcost_amount,
"canceled_at": ope.canceled_at,
"article__name": ope.article and ope.article.name or None,
"addcost_for__trigramme": ope.addcost_for
and ope.addcost_for.trigramme
or None,
}
if request.user.has_perm("kfet.is_team"):
ope_dict["canceled_by__trigramme"] = (
ope.canceled_by and ope.canceled_by.trigramme or None
)
opegroup_dict["entries"].append(ope_dict)
history_groups.append(opegroup_dict)
for transfergroup in transfergroups:
if transfergroup.filtered_transfers:
transfergroup_dict = {
"type": "transfer",
"id": transfergroup.id,
"at": transfergroup.at,
"comment": transfergroup.comment,
"entries": [],
}
if request.user.has_perm("kfet.is_team"):
transfergroup_dict["valid_by__trigramme"] = (
transfergroup.valid_by and transfergroup.valid_by.trigramme or None
)
for transfer in transfergroup.filtered_transfers:
transfer_dict = {
"id": transfer.id,
"amount": transfer.amount,
"canceled_at": transfer.canceled_at,
"from_acc": transfer.from_acc.trigramme,
"to_acc": transfer.to_acc.trigramme,
}
if request.user.has_perm("kfet.is_team"):
transfer_dict["canceled_by__trigramme"] = (
transfer.canceled_by and transfer.canceled_by.trigramme or None
)
transfergroup_dict["entries"].append(transfer_dict)
history_groups.append(transfergroup_dict)
history_groups.sort(key=lambda group: group["at"])
return JsonResponse({"groups": history_groups})
@teamkfet_required
def kpsul_articles_data(request):
articles = Article.objects.values(
"id",
"name",
"price",
"stock",
"category_id",
"category__name",
"category__has_addcost",
"category__has_reduction",
).filter(is_sold=True)
return JsonResponse({"articles": list(articles)})
@teamkfet_required
def history(request):
# These limits are only useful for JS datepickers
# They don't enforce anything and can be bypassed
# Serious checks are done in history_json
history_limit, history_no_limit = get_history_limit(request.user)
history_no_limit_account_ids = Account.objects.filter(
trigramme__in=settings.KFET_HISTORY_NO_DATE_LIMIT_TRIGRAMMES
).values_list("id", flat=True)
format_date = lambda date: date.strftime("%Y-%m-%d %H:%M")
data = {
"filter_form": FilterHistoryForm(),
"history_limit": format_date(history_limit),
"history_no_limit_account_ids": history_no_limit_account_ids,
"history_no_limit": format_date(history_no_limit),
}
return render(request, "kfet/history.html", data)
# -----
# Settings views
# -----
class SettingsList(TemplateView):
template_name = "kfet/settings.html"
config_list = permission_required("kfet.see_config")(SettingsList.as_view())
@method_decorator(kfet_password_auth, name="dispatch")
class SettingsUpdate(SuccessMessageMixin, FormView):
form_class = KFetConfigForm
template_name = "kfet/settings_update.html"
success_message = "Paramètres mis à jour"
success_url = reverse_lazy("kfet.settings")
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm("kfet.change_config"):
form.add_error(
None, ValidationError("Permission refusée", code="permission-denied")
)
return self.form_invalid(form)
form.save()
return super().form_valid(form)
config_update = permission_required("kfet.change_config")(SettingsUpdate.as_view())
# -----
# Transfer views
# -----
@method_decorator(teamkfet_required, name="dispatch")
class TransferView(TemplateView):
template_name = "kfet/transfers.html"
@teamkfet_required
def transfers_create(request):
transfer_formset = TransferFormSet(queryset=Transfer.objects.none())
return render(
request, "kfet/transfers_create.html", {"transfer_formset": transfer_formset}
)
@teamkfet_required
@kfet_password_auth
def perform_transfers(request):
data = {"errors": []}
# Checking transfer_formset
transfer_formset = TransferFormSet(request.POST)
try:
if not transfer_formset.is_valid():
for form_errors in transfer_formset.errors:
for field, errors in form_errors.items():
if field == "amount":
for error in errors:
data["errors"].append({"code": "amount", "message": error})
else:
# C'est compliqué de trouver le compte qui pose problème...
acc_error = True
if acc_error:
data["errors"].append(
{
"code": "invalid_acc",
"message": "L'un des comptes est invalide ou manquant",
}
)
return JsonResponse(data, status=400)
except ValidationError:
data["errors"].append(
{"code": "invalid_request", "message": "Requête invalide"}
)
return JsonResponse(data, status=400)
transfers = transfer_formset.save(commit=False)
# Initializing vars
required_perms = set(
["kfet.add_transfer"]
) # Required perms to perform all transfers
to_accounts_balances = defaultdict(int) # For balances of accounts
for transfer in transfers:
to_accounts_balances[transfer.from_acc] -= transfer.amount
to_accounts_balances[transfer.to_acc] += transfer.amount
negative_accounts = []
# Checking if ok on all accounts
frozen = set()
for account in to_accounts_balances:
if account.is_frozen:
frozen.add(account.trigramme)
(perms, stop) = account.perms_to_perform_operation(
amount=to_accounts_balances[account]
)
required_perms |= perms
if stop:
negative_accounts.append(account.trigramme)
if frozen:
data["errors"].append(
{
"code": "frozen",
"message": "Les comptes suivants sont gelés : {}".format(
", ".join(frozen)
),
}
)
if negative_accounts:
data["errors"].append(
{
"code": "negative",
"message": "Solde insuffisant pour les comptes suivants : {}".format(
", ".join(negative_accounts)
),
}
)
if data["errors"]:
return JsonResponse(data, status=400)
if not request.user.has_perms(required_perms):
data["missing_perms"] = get_missing_perms(required_perms, request.user)
return JsonResponse(data, status=403)
# Creating transfer group
transfergroup = TransferGroup()
if required_perms:
transfergroup.valid_by = request.user.profile.account_kfet
comment = request.POST.get("comment", "")
transfergroup.comment = comment.strip()
with transaction.atomic():
# Updating balances accounts
for account in to_accounts_balances:
Account.objects.filter(pk=account.pk).update(
balance=F("balance") + to_accounts_balances[account]
)
account.refresh_from_db()
account.update_negative()
# Saving transfer group
transfergroup.save()
# Saving all transfers with group
for transfer in transfers:
transfer.group = transfergroup
transfer.save()
return JsonResponse({})
@teamkfet_required
@kfet_password_auth
def cancel_transfers(request):
# Pour la réponse
data = {"canceled": [], "warnings": {}, "errors": []}
# Checking if BAD REQUEST (transfers_pk not int or not existing)
try:
# Set pour virer les doublons
transfers_post = set(
map(int, filter(None, request.POST.getlist("transfers[]", [])))
)
except ValueError:
data["errors"].append(
{"code": "invalid_request", "message": "Requête invalide !"}
)
return JsonResponse(data, status=400)
transfers_all = Transfer.objects.select_related(
"group", "from_acc", "from_acc__negative", "to_acc", "to_acc__negative"
).filter(pk__in=transfers_post)
transfers_pk = [transfer.pk for transfer in transfers_all]
transfers_notexisting = [
transfer for transfer in transfers_post if transfer not in transfers_pk
]
if transfers_notexisting:
data["errors"].append(
{
"code": "cancel_missing",
"message": "Transferts inexistants : {}".format(
", ".join(map(str, transfers_notexisting))
),
}
)
return JsonResponse(data, status=400)
transfers_already_canceled = [] # Déjà annulés
transfers = [] # Pas déjà annulés
required_perms = set()
cancel_duration = kfet_config.cancel_duration
# Modifs à faire sur les balances des comptes
to_accounts_balances = defaultdict(int)
for transfer in transfers_all:
if transfer.canceled_at:
# Transfert déjà annulé, va pour un warning en Response
transfers_already_canceled.append(transfer.pk)
else:
transfers.append(transfer.pk)
# Si transfer il y a plus de CANCEL_DURATION, permission requise
if transfer.group.at + cancel_duration < timezone.now():
required_perms.add("kfet.cancel_old_operations")
# Calcul de toutes modifs à faire en cas de validation
# Pour les balances de comptes
to_accounts_balances[transfer.from_acc] += transfer.amount
to_accounts_balances[transfer.to_acc] += -transfer.amount
if not transfers:
data["warnings"]["already_canceled"] = transfers_already_canceled
return JsonResponse(data)
negative_accounts = []
# Checking permissions or stop
for account in to_accounts_balances:
(perms, stop) = account.perms_to_perform_operation(
amount=to_accounts_balances[account]
)
required_perms |= perms
if stop:
negative_accounts.append(account.trigramme)
if negative_accounts:
data["errors"].append(
{
"code": "negative",
"message": "Solde insuffisant pour les comptes suivants : {}".format(
", ".join(negative_accounts)
),
}
)
return JsonResponse(data, status=400)
if not request.user.has_perms(required_perms):
data["missing_perms"] = get_missing_perms(required_perms, request.user)
return JsonResponse(data, status=403)
canceled_by = required_perms and request.user.profile.account_kfet or None
canceled_at = timezone.now()
with transaction.atomic():
(
Transfer.objects.filter(pk__in=transfers).update(
canceled_by=canceled_by, canceled_at=canceled_at
)
)
for account in to_accounts_balances:
Account.objects.filter(pk=account.pk).update(
balance=F("balance") + to_accounts_balances[account]
)
account.refresh_from_db()
account.update_negative()
transfers = (
Transfer.objects.values("id", "canceled_at", "canceled_by__trigramme")
.filter(pk__in=transfers)
.order_by("pk")
)
data["canceled"] = list(transfers)
if transfers_already_canceled:
data["warnings"]["already_canceled"] = transfers_already_canceled
return JsonResponse(data)
class InventoryList(ListView):
queryset = (
Inventory.objects.select_related("by", "order")
.annotate(nb_articles=Count("articles"))
.order_by("-at")
)
template_name = "kfet/inventory.html"
context_object_name = "inventories"
@teamkfet_required
@kfet_password_auth
def inventory_create(request):
articles = Article.objects.select_related("category").order_by(
"-is_sold", "category__name", "name"
)
initial = []
for article in articles:
initial.append(
{
"is_sold": article.is_sold,
"article": article.pk,
"stock_old": article.stock,
"name": article.name,
"category": article.category_id,
"category__name": article.category.name,
"box_capacity": article.box_capacity or 0,
}
)
cls_formset = formset_factory(form=InventoryArticleForm, extra=0)
if request.POST:
formset = cls_formset(request.POST, initial=initial)
if not request.user.has_perm("kfet.add_inventory"):
messages.error(
request, "Permission refusée", extra_tags="permission-denied"
)
elif formset.is_valid():
with transaction.atomic():
articles = Article.objects.select_for_update()
inventory = Inventory()
inventory.by = request.user.profile.account_kfet
saved = False
for form in formset:
if form.cleaned_data["stock_new"] is not None:
if not saved:
inventory.save()
saved = True
article = articles.get(pk=form.cleaned_data["article"].pk)
stock_old = article.stock
stock_new = form.cleaned_data["stock_new"]
InventoryArticle.objects.create(
inventory=inventory,
article=article,
stock_old=stock_old,
stock_new=stock_new,
)
article.stock = stock_new
article.save()
if saved:
messages.success(request, "Inventaire créé")
return redirect("kfet.inventory")
messages.warning(request, "Bah alors ? On a rien compté ?")
else:
messages.error(request, "Pas marché")
else:
formset = cls_formset(initial=initial)
return render(request, "kfet/inventory_create.html", {"formset": formset})
class InventoryRead(DetailView):
model = Inventory
template_name = "kfet/inventory_read.html"
context_object_name = "inventory"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
output_field = DecimalField(max_digits=10, decimal_places=2, default=0)
inventory_articles = (
InventoryArticle.objects.select_related("article", "article__category")
.filter(inventory=self.object)
.annotate(
amount_error=ExpressionWrapper(
F("stock_error") * F("article__price"), output_field=output_field
)
)
.order_by("article__category__name", "article__name")
)
context["inventoryarts"] = inventory_articles
stats = inventory_articles.aggregate(
new=ExpressionWrapper(
Sum(F("stock_new") * F("article__price")), output_field=output_field
),
error=Sum("amount_error"),
old=ExpressionWrapper(
Sum(F("stock_old") * F("article__price")), output_field=output_field
),
)
context.update(
{
"total_amount_old": stats["old"],
"total_amount_new": stats["new"],
"total_amount_error": stats["error"],
}
)
return context
class InventoryDelete(PermissionRequiredMixin, DeleteView):
model = Inventory
success_url = reverse_lazy("kfet.inventory")
success_message = "Inventaire annulé avec succès !"
permission_required = "kfet.delete_inventory"
def get(self, request, *args, **kwargs):
return redirect("kfet.inventory.read", self.kwargs.get(self.pk_url_kwarg))
def delete(self, request, *args, **kwargs):
inv = self.get_object()
# On met à jour les articles dont c'est le dernier inventaire
# .get() ne marche pas avec OuterRef, donc on utilise .filter() avec [:1]
update_subquery = InventoryArticle.objects.filter(
inventory=inv, article=OuterRef("pk")
).values("stock_old")[:1]
Article.objects.annotate(last_env=Max("inventories__at")).filter(
last_env=inv.at
).update(stock=Subquery(update_subquery))
# On a tout mis à jour, on peut delete (avec un message)
messages.success(request, self.success_message)
return super().delete(request, *args, **kwargs)
# -----
# Order views
# -----
class OrderList(ListView):
queryset = Order.objects.select_related("supplier", "inventory")
template_name = "kfet/order.html"
context_object_name = "orders"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context["suppliers"] = Supplier.objects.order_by("name")
return context
@teamkfet_required
@kfet_password_auth
def order_create(request, pk):
supplier = get_object_or_404(Supplier, pk=pk)
articles = (
Article.objects.filter(suppliers=supplier.pk)
.distinct()
.select_related("category")
.order_by("-is_sold", "category__name", "name")
)
# Force hit to cache
articles = list(articles)
sales_q = (
Operation.objects.select_related("group")
.filter(article__in=articles, canceled_at=None)
.values("article")
.annotate(nb=Sum("article_nb"))
)
scale = WeekScale(last=True, n_steps=5, std_chunk=False)
chunks = scale.chunkify_qs(sales_q, field="group__at")
sales = [{d["article"]: d["nb"] for d in chunk} for chunk in chunks]
initial = []
for article in articles:
# Get sales for each 5 last weeks
v_all = [chunk.get(article.pk, 0) for chunk in sales]
# Take the 3 greatest (eg to avoid 2 weeks of vacations)
v_3max = heapq.nlargest(3, v_all)
# Get average and standard deviation
v_moy = statistics.mean(v_3max)
v_et = statistics.pstdev(v_3max, v_moy)
# Expected sales for next week
v_prev = v_moy + v_et
# We want to have 1.5 * the expected sales in stock
# (because sometimes some articles are not delivered)
c_rec_tot = max(v_prev * 1.5 - article.stock, 0)
# If ordered quantity is close enough to a level which can led to free
# boxes, we increase it to this level.
if article.box_capacity:
c_rec_temp = c_rec_tot / article.box_capacity
if c_rec_temp >= 10:
c_rec = round(c_rec_temp)
elif c_rec_temp > 5:
c_rec = 10
elif c_rec_temp > 2:
c_rec = 5
else:
c_rec = round(c_rec_temp)
initial.append(
{
"article": article.pk,
"name": article.name,
"category": article.category_id,
"category__name": article.category.name,
"stock": article.stock,
"box_capacity": article.box_capacity,
"v_all": v_all,
"v_moy": round(v_moy),
"v_et": round(v_et),
"v_prev": round(v_prev),
"c_rec": article.box_capacity and c_rec or round(c_rec_tot),
"is_sold": article.is_sold,
}
)
cls_formset = formset_factory(form=OrderArticleForm, extra=0)
if request.POST:
formset = cls_formset(request.POST, initial=initial)
if not request.user.has_perm("kfet.add_order"):
messages.error(
request, "Permission refusée", extra_tags="permission-denied"
)
elif formset.is_valid():
order = Order()
order.supplier = supplier
saved = False
for form in formset:
if form.cleaned_data["quantity_ordered"] is not None:
if not saved:
order.save()
saved = True
article = form.cleaned_data["article"]
q_ordered = form.cleaned_data["quantity_ordered"]
if article.box_capacity:
q_ordered *= article.box_capacity
OrderArticle.objects.create(
order=order, article=article, quantity_ordered=q_ordered
)
if saved:
messages.success(request, "Commande créée")
return redirect("kfet.order.read", order.pk)
messages.warning(request, "Rien commandé => Pas de commande")
else:
messages.error(request, "Corrigez les erreurs")
else:
formset = cls_formset(initial=initial)
scale.label_fmt = "S-{rev_i}"
return render(
request,
"kfet/order_create.html",
{"supplier": supplier, "formset": formset, "scale": scale},
)
class OrderRead(DetailView):
model = Order
template_name = "kfet/order_read.html"
context_object_name = "order"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
orderarticles = (
OrderArticle.objects.select_related("article", "article__category")
.filter(order=self.object)
.order_by("article__category__name", "article__name")
)
context["orderarts"] = orderarticles
mail = (
"Bonjour,\n\nNous voudrions pour le ##DATE## à la K-Fêt de " "l'ENS Ulm :"
)
category = 0
for orderarticle in orderarticles:
if category != orderarticle.article.category:
category = orderarticle.article.category
mail += "\n"
nb = orderarticle.quantity_ordered
box = ""
if orderarticle.article.box_capacity:
nb /= orderarticle.article.box_capacity
if nb >= 2:
box = " %ss de" % orderarticle.article.box_type
else:
box = " %s de" % orderarticle.article.box_type
name = orderarticle.article.name.capitalize()
mail += "\n- %s%s %s" % (round(nb), box, name)
mail += (
"\n\nMerci d'appeler le numéro suivant lorsque les livreurs "
"sont là : ##TELEPHONE##\nCordialement,\n##PRENOM## ##NOM## "
", pour la K-Fêt de l'ENS Ulm"
)
context["mail"] = mail
return context
@teamkfet_required
@kfet_password_auth
def order_to_inventory(request, pk):
order = get_object_or_404(Order, pk=pk)
if hasattr(order, "inventory"):
raise Http404
supplier_prefetch = Prefetch(
"article__supplierarticle_set",
queryset=(
SupplierArticle.objects.filter(supplier=order.supplier).order_by("-at")
),
to_attr="supplier",
)
order_articles = (
OrderArticle.objects.filter(order=order.pk)
.select_related("article", "article__category")
.prefetch_related(supplier_prefetch)
.order_by("article__category__name", "article__name")
)
initial = []
for order_article in order_articles:
article = order_article.article
initial.append(
{
"article": article.pk,
"name": article.name,
"category": article.category_id,
"category__name": article.category.name,
"quantity_ordered": order_article.quantity_ordered,
"quantity_received": order_article.quantity_ordered,
"price_HT": article.supplier[0].price_HT,
"TVA": article.supplier[0].TVA,
"rights": article.supplier[0].rights,
}
)
cls_formset = formset_factory(OrderArticleToInventoryForm, extra=0)
if request.method == "POST":
formset = cls_formset(request.POST, initial=initial)
if not request.user.has_perm("kfet.order_to_inventory"):
messages.error(
request, "Permission refusée", extra_tags="permission-denied"
)
elif formset.is_valid():
with transaction.atomic():
inventory = Inventory.objects.create(
order=order, by=request.user.profile.account_kfet
)
new_supplierarticle = []
new_inventoryarticle = []
for form in formset:
q_received = form.cleaned_data["quantity_received"]
article = form.cleaned_data["article"]
price_HT = form.cleaned_data["price_HT"]
TVA = form.cleaned_data["TVA"]
rights = form.cleaned_data["rights"]
if any(
(
form.initial["price_HT"] != price_HT,
form.initial["TVA"] != TVA,
form.initial["rights"] != rights,
)
):
new_supplierarticle.append(
SupplierArticle(
supplier=order.supplier,
article=article,
price_HT=price_HT,
TVA=TVA,
rights=rights,
)
)
(
OrderArticle.objects.filter(
order=order, article=article
).update(quantity_received=q_received)
)
new_inventoryarticle.append(
InventoryArticle(
inventory=inventory,
article=article,
stock_old=article.stock,
stock_new=article.stock + q_received,
)
)
article.stock += q_received
if q_received > 0:
article.is_sold = True
article.save()
SupplierArticle.objects.bulk_create(new_supplierarticle)
InventoryArticle.objects.bulk_create(new_inventoryarticle)
messages.success(request, "C'est tout bon !")
return redirect("kfet.order")
else:
messages.error(request, "Corrigez les erreurs")
else:
formset = cls_formset(initial=initial)
return render(
request, "kfet/order_to_inventory.html", {"formset": formset, "order": order}
)
@method_decorator(kfet_password_auth, name="dispatch")
class SupplierUpdate(SuccessMessageMixin, UpdateView):
model = Supplier
template_name = "kfet/supplier_form.html"
fields = ["name", "address", "email", "phone", "comment"]
success_url = reverse_lazy("kfet.order")
sucess_message = "Données fournisseur mis à jour"
# Surcharge de la validation
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm("kfet.change_supplier"):
form.add_error(
None, ValidationError("Permission refusée", code="permission-denied")
)
return self.form_invalid(form)
# Updating
return super().form_valid(form)
# ==========
# Statistics
# ==========
# ---------------
# Vues génériques
# ---------------
# source : docs.djangoproject.com/fr/1.10/topics/class-based-views/mixins/
class JSONResponseMixin:
"""
A mixin that can be used to render a JSON response.
"""
def render_to_json_response(self, context, **response_kwargs):
"""
Returns a JSON response, transforming 'context' to make the payload.
"""
return JsonResponse(self.get_data(context), **response_kwargs)
def get_data(self, context):
"""
Returns an object that will be serialized as JSON by json.dumps().
"""
# Note: This is *EXTREMELY* naive; in reality, you'll need
# to do much more complex handling to ensure that arbitrary
# objects -- such as Django model instances or querysets
# -- can be serialized as JSON.
return context
class JSONDetailView(JSONResponseMixin, BaseDetailView):
"""Returns a DetailView that renders a JSON."""
def render_to_response(self, context):
return self.render_to_json_response(context)
class SingleResumeStat(JSONDetailView):
"""
Génère l'interface de sélection pour les statistiques d'un compte/article.
L'interface est constituée d'une série de boutons, qui récupèrent et graphent
des statistiques du même type, sur le même objet mais avec des arguments différents.
Attributs :
- url_stat : URL où récupérer les statistiques
- stats : liste de dictionnaires avec les clés suivantes :
- label : texte du bouton
- url_params : paramètres GET à rajouter à `url_stat`
- default : si `True`, graphe à montrer par défaut
On peut aussi définir `stats` dynamiquement, via la fonction `get_stats`.
"""
url_stat = None
stats = []
def get_stats(self):
return self.stats
def get_context_data(self, **kwargs):
# On n'hérite pas
context = {}
stats = []
# On peut avoir récupéré self.object via pk ou slug
if self.pk_url_kwarg in self.kwargs:
url_pk = getattr(self.object, self.pk_url_kwarg)
else:
url_pk = getattr(self.object, self.slug_url_kwarg)
for stat_def in self.get_stats():
url_params_d = stat_def.get("url_params", {})
if len(url_params_d) > 0:
url_params = "?{}".format(urlencode(url_params_d))
else:
url_params = ""
stats.append(
{
"label": stat_def["label"],
"url": "{url}{params}".format(
url=reverse(self.url_stat, args=[url_pk]), params=url_params
),
"default": stat_def.get("default", False),
}
)
context["stats"] = stats
return context
class UserAccountMixin:
"""
Mixin qui vérifie que le compte traité par la vue est celui de l'utilisateur·ice
actuel·le. Dans le cas contraire, renvoie un Http404.
"""
def get_object(self, *args, **kwargs):
obj = super().get_object(*args, **kwargs)
if self.request.user != obj.user:
raise Http404
return obj
class ScaleMixin(object):
"""Mixin pour utiliser les outils de `kfet.statistic`."""
def get_context_data(self, *args, **kwargs):
# On n'hérite pas
form = StatScaleForm(self.request.GET, prefix="scale")
if not form.is_valid():
raise SuspiciousOperation(
"Invalid StatScaleForm. Did someone tamper with the GET parameters ?"
)
scale_name = form.cleaned_data.pop("name")
scale_cls = SCALE_DICT.get(scale_name)
self.scale = scale_cls(**form.cleaned_data)
return {"labels": self.scale.get_labels()}
# -----------------------
# Evolution Balance perso
# -----------------------
@method_decorator(login_required, name="dispatch")
class AccountStatBalanceList(UserAccountMixin, SingleResumeStat):
"""
Menu général pour l'historique de balance d'un compte
"""
model = Account
slug_url_kwarg = "trigramme"
slug_field = "trigramme"
url_stat = "kfet.account.stat.balance"
stats = [
{"label": "Tout le temps"},
{"label": "1 an", "url_params": {"last_days": 365}},
{"label": "6 mois", "url_params": {"last_days": 183}},
{"label": "3 mois", "url_params": {"last_days": 90}, "default": True},
{"label": "30 jours", "url_params": {"last_days": 30}},
]
@method_decorator(login_required, name="dispatch")
class AccountStatBalance(UserAccountMixin, JSONDetailView):
"""
Statistiques (JSON) d'historique de balance d'un compte.
Prend en compte les opérations et transferts sur la période donnée.
"""
model = Account
slug_url_kwarg = "trigramme"
slug_field = "trigramme"
def get_changes_list(self, last_days=None, begin_date=None, end_date=None):
account = self.object
# prepare filters
if last_days is not None:
end_date = timezone.now()
begin_date = end_date - timezone.timedelta(days=last_days)
# prepare querysets
# TODO: retirer les opgroup dont tous les op sont annulées
opegroups = OperationGroup.objects.filter(on_acc=account)
transfers = Transfer.objects.filter(canceled_at=None).select_related("group")
recv_transfers = transfers.filter(to_acc=account)
sent_transfers = transfers.filter(from_acc=account)
# apply filters
if begin_date is not None:
opegroups = opegroups.filter(at__gte=begin_date)
recv_transfers = recv_transfers.filter(group__at__gte=begin_date)
sent_transfers = sent_transfers.filter(group__at__gte=begin_date)
if end_date is not None:
opegroups = opegroups.filter(at__lte=end_date)
recv_transfers = recv_transfers.filter(group__at__lte=end_date)
sent_transfers = sent_transfers.filter(group__at__lte=end_date)
# On transforme tout ça en une liste de dictionnaires sous la forme
# {'at': date,
# 'amount': changement de la balance (négatif si diminue la balance,
# positif si l'augmente),
# 'label': text descriptif,
# 'balance': état de la balance après l'action (0 pour le moment,
# sera mis à jour lors d'une
# autre passe)
# }
actions = []
actions.append(
{
"at": (begin_date or account.created_at).isoformat(),
"amount": 0,
"balance": 0,
}
)
actions.append(
{"at": (end_date or timezone.now()).isoformat(), "amount": 0, "balance": 0}
)
actions += (
[
{"at": ope_grp.at.isoformat(), "amount": ope_grp.amount, "balance": 0}
for ope_grp in opegroups
]
+ [
{"at": tr.group.at.isoformat(), "amount": tr.amount, "balance": 0}
for tr in recv_transfers
]
+ [
{"at": tr.group.at.isoformat(), "amount": -tr.amount, "balance": 0}
for tr in sent_transfers
]
)
# Maintenant on trie la liste des actions par ordre du plus récent
# an plus ancien et on met à jour la balance
if len(actions) > 1:
actions = sorted(actions, key=lambda k: k["at"], reverse=True)
actions[0]["balance"] = account.balance
for i in range(len(actions) - 1):
actions[i + 1]["balance"] = (
actions[i]["balance"] - actions[i + 1]["amount"]
)
return actions
def get_context_data(self, *args, **kwargs):
context = {}
form = AccountStatForm(self.request.GET)
if not form.is_valid():
raise SuspiciousOperation(
"Invalid AccountStatForm. Did someone tamper with the GET parameters ?"
)
changes = self.get_changes_list(**form.cleaned_data)
context["charts"] = [
{"color": "rgb(200, 20, 60)", "label": "Balance", "values": changes}
]
context["is_time_chart"] = True
if len(changes) > 0:
context["min_date"] = changes[-1]["at"]
context["max_date"] = changes[0]["at"]
# TODO: offset
return context
# ------------------------
# Consommation personnelle
# ------------------------
@method_decorator(login_required, name="dispatch")
class AccountStatOperationList(UserAccountMixin, SingleResumeStat):
"""
Menu général pour l'historique de consommation d'un compte
"""
model = Account
slug_url_kwarg = "trigramme"
slug_field = "trigramme"
url_stat = "kfet.account.stat.operation"
def get_stats(self):
scales_def = [
(
"Tout le temps",
MonthScale,
{"last": True, "begin": self.object.created_at.replace(tzinfo=None)},
False,
),
("1 an", MonthScale, {"last": True, "n_steps": 12}, False),
("3 mois", WeekScale, {"last": True, "n_steps": 13}, True),
("2 semaines", DayScale, {"last": True, "n_steps": 14}, False),
]
return scale_url_params(scales_def)
@method_decorator(login_required, name="dispatch")
class AccountStatOperation(UserAccountMixin, ScaleMixin, JSONDetailView):
"""
Statistiques (JSON) de consommation (nb d'items achetés) d'un compte.
"""
model = Account
slug_url_kwarg = "trigramme"
slug_field = "trigramme"
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
operations = (
Operation.objects.filter(
type=Operation.PURCHASE, group__on_acc=self.object, canceled_at=None
)
.values("article_nb", "group__at")
.order_by("group__at")
)
# On compte les opérations
nb_ventes = self.scale.chunkify_qs(
operations, field="group__at", aggregate=Sum("article_nb")
)
context["charts"] = [
{
"color": "rgb(200, 20, 60)",
"label": "NB items achetés",
"values": nb_ventes,
}
]
return context
# ------------------------
# Article Statistiques Last
# ------------------------
@method_decorator(teamkfet_required, name="dispatch")
class ArticleStatSalesList(SingleResumeStat):
"""
Menu pour les statistiques de vente d'un article.
"""
model = Article
nb_default = 2
url_stat = "kfet.article.stat.sales"
def get_stats(self):
first_conso = (
Operation.objects.filter(article=self.object)
.order_by("group__at")
.values_list("group__at", flat=True)
.first()
)
if first_conso is None:
# On le crée dans le passé au cas où
first_conso = timezone.now() - timedelta(seconds=1)
scales_def = [
(
"Tout le temps",
MonthScale,
{"last": True, "begin": first_conso.strftime("%Y-%m-%d %H:%M:%S")},
False,
),
("1 an", MonthScale, {"last": True, "n_steps": 12}, False),
("3 mois", WeekScale, {"last": True, "n_steps": 13}, True),
("2 semaines", DayScale, {"last": True, "n_steps": 14}, False),
]
return scale_url_params(scales_def)
@method_decorator(teamkfet_required, name="dispatch")
class ArticleStatSales(ScaleMixin, JSONDetailView):
"""
Statistiques (JSON) de vente d'un article.
Sépare LIQ et les comptes K-Fêt, et rajoute le total.
"""
model = Article
context_object_name = "article"
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
scale = self.scale
all_purchases = (
Operation.objects.filter(
type=Operation.PURCHASE, article=self.object, canceled_at=None
)
.values("group__at", "article_nb")
.order_by("group__at")
)
cof_accts = all_purchases.filter(group__on_acc__cofprofile__is_cof=True)
noncof_accts = all_purchases.exclude(group__on_acc__cofprofile__is_cof=True)
nb_cof = scale.chunkify_qs(
cof_accts, field="group__at", aggregate=Sum("article_nb")
)
nb_noncof = scale.chunkify_qs(
noncof_accts, field="group__at", aggregate=Sum("article_nb")
)
nb_ventes = [n1 + n2 for n1, n2 in zip(nb_cof, nb_noncof)]
context["charts"] = [
{
"color": "rgb(200, 20, 60)",
"label": "Toutes consommations",
"values": nb_ventes,
},
{"color": "rgb(54, 162, 235)", "label": "Comptes K-Fêt", "values": nb_cof},
{
"color": "rgb(255, 205, 86)",
"label": "LIQ",
"values": nb_noncof,
},
]
return context
# ---
# Autocompletion views
# ---
class AccountCreateAutocompleteView(PermissionRequiredMixin, AutocompleteView):
template_name = "kfet/search_results.html"
permission_required = "kfet.is_team"
search_composer = kfet_autocomplete
class AccountSearchAutocompleteView(PermissionRequiredMixin, AutocompleteView):
permission_required = "kfet.is_team"
search_composer = kfet_account_only_autocomplete