import heapq import statistics from collections import defaultdict from datetime import datetime, timedelta from decimal import Decimal from typing import List, Tuple from urllib.parse import urlencode from asgiref.sync import async_to_sync from channels.layers import get_channel_layer from django.conf import settings from django.contrib import messages from django.contrib.auth.decorators import login_required, permission_required from django.contrib.auth.mixins import PermissionRequiredMixin from django.contrib.auth.models import Permission, User from django.contrib.messages.views import SuccessMessageMixin from django.core.exceptions import SuspiciousOperation from django.db import transaction from django.db.models import ( Count, DecimalField, ExpressionWrapper, F, Max, OuterRef, Prefetch, Q, Subquery, Sum, ) from django.forms import ValidationError, formset_factory from django.http import ( Http404, HttpResponseBadRequest, HttpResponseForbidden, JsonResponse, ) from django.shortcuts import get_object_or_404, redirect, render from django.urls import reverse, reverse_lazy from django.utils import timezone from django.utils.decorators import method_decorator from django.views.generic import DetailView, FormView, ListView, TemplateView from django.views.generic.detail import BaseDetailView from django.views.generic.edit import CreateView, DeleteView, UpdateView from gestioncof.models import CofProfile from kfet import KFET_DELETED_TRIGRAMME from kfet.auth.decorators import kfet_password_auth from kfet.autocomplete import kfet_account_only_autocomplete, kfet_autocomplete from kfet.config import kfet_config from kfet.decorators import teamkfet_required from kfet.forms import ( AccountForm, AccountFrozenForm, AccountNoTriForm, AccountPwdForm, AccountStatForm, AccountTriForm, AddcostForm, ArticleForm, ArticleRestrictForm, CategoryForm, CheckoutForm, CheckoutRestrictForm, CheckoutStatementCreateForm, CheckoutStatementUpdateForm, CofForm, FilterHistoryForm, InventoryArticleForm, KFetConfigForm, KPsulAccountForm, KPsulCheckoutForm, KPsulOperationFormSet, KPsulOperationGroupForm, OrderArticleForm, OrderArticleToInventoryForm, StatScaleForm, TransferFormSet, UserForm, UserGroupForm, UserInfoForm, ) from kfet.models import ( Account, AccountNegative, Article, ArticleCategory, Checkout, CheckoutStatement, Inventory, InventoryArticle, Operation, OperationGroup, Order, OrderArticle, Supplier, SupplierArticle, Transfer, TransferGroup, ) from kfet.statistic import SCALE_DICT, DayScale, MonthScale, WeekScale, scale_url_params from shared.views import AutocompleteView from .auth import KFET_GENERIC_TRIGRAMME from .auth.views import ( # noqa AccountGroupCreate, AccountGroupUpdate, account_group, login_generic, ) def put_cleaned_data_in_dict(dict, form): for field in form.cleaned_data: dict[field] = form.cleaned_data[field] # ----- # Account views # ----- # Account - General @login_required @teamkfet_required def account(request): accounts = Account.objects.select_related("cofprofile__user").order_by("trigramme") return render(request, "kfet/account.html", {"accounts": accounts}) @login_required @teamkfet_required def account_is_validandfree_ajax(request): if not request.GET.get("trigramme", ""): raise Http404 trigramme = request.GET.get("trigramme") data = Account.is_validandfree(trigramme) return JsonResponse(data) # Account - Create @login_required @teamkfet_required @kfet_password_auth def account_create(request): # Enregistrement if request.method == "POST": trigramme_form = AccountTriForm(request.POST) # Peuplement des forms username = request.POST.get("username") login_clipper = request.POST.get("login_clipper") forms = get_account_create_forms( request, username=username, login_clipper=login_clipper ) account_form = forms["account_form"] cof_form = forms["cof_form"] user_form = forms["user_form"] if all( ( user_form.is_valid(), cof_form.is_valid(), trigramme_form.is_valid(), account_form.is_valid(), ) ): # Checking permission if not request.user.has_perm("kfet.add_account"): messages.error( request, "Permission refusée", extra_tags="permission-denied" ) else: data = {} # Fill data for Account.save() put_cleaned_data_in_dict(data, user_form) put_cleaned_data_in_dict(data, cof_form) try: account = trigramme_form.save(data=data) account_form = AccountNoTriForm(request.POST, instance=account) account_form.save() messages.success(request, "Compte créé : %s" % account.trigramme) return redirect("kfet.account.create") except Account.UserHasAccount as e: messages.error( request, "Cet utilisateur a déjà un compte K-Fêt : %s" % e.trigramme, ) else: initial = {"trigramme": request.GET.get("trigramme", "")} trigramme_form = AccountTriForm(initial=initial) account_form = None cof_form = None user_form = None return render( request, "kfet/account_create.html", { "trigramme_form": trigramme_form, "account_form": account_form, "cof_form": cof_form, "user_form": user_form, }, ) def account_form_set_readonly_fields(user_form, cof_form): user_form.fields["username"].widget.attrs["readonly"] = True user_form.fields["first_name"].widget.attrs["readonly"] = True user_form.fields["last_name"].widget.attrs["readonly"] = True user_form.fields["email"].widget.attrs["readonly"] = True cof_form.fields["login_clipper"].widget.attrs["readonly"] = True cof_form.fields["departement"].widget.attrs["readonly"] = True cof_form.fields["is_cof"].widget.attrs["disabled"] = True def get_account_create_forms( request=None, username=None, login_clipper=None, fullname=None ): user = None clipper = False if login_clipper and (login_clipper == username or not username): # à partir d'un clipper # le user associé à ce clipper ne devrait pas encore exister clipper = True try: # Vérification que clipper ne soit pas déjà dans User user = User.objects.get(username=login_clipper) # Ici, on nous a menti, le user existe déjà username = user.username clipper = False except User.DoesNotExist: # Clipper (sans user déjà existant) # UserForm - Prefill user_initial = { "username": login_clipper, "email": "%s@clipper.ens.fr" % login_clipper, } if fullname: # Prefill du nom et prénom names = fullname.split() # Le premier, c'est le prénom user_initial["first_name"] = names[0] if len(names) > 1: # Si d'autres noms -> tous dans le nom de famille user_initial["last_name"] = " ".join(names[1:]) # CofForm - Prefill cof_initial = {"login_clipper": login_clipper} # Form créations if request: user_form = UserForm(request.POST, initial=user_initial) cof_form = CofForm(request.POST, initial=cof_initial) else: user_form = UserForm(initial=user_initial) cof_form = CofForm(initial=cof_initial) # Protection (read-only) des champs username et login_clipper account_form_set_readonly_fields(user_form, cof_form) if username and not clipper: try: user = User.objects.get(username=username) # le user existe déjà # récupération du profil cof (cof, _) = CofProfile.objects.get_or_create(user=user) # UserForm + CofForm - Création à partir des instances existantes if request: user_form = UserForm(request.POST, instance=user) cof_form = CofForm(request.POST, instance=cof) else: user_form = UserForm(instance=user) cof_form = CofForm(instance=cof) # Protection (read-only) des champs username, login_clipper et is_cof account_form_set_readonly_fields(user_form, cof_form) except User.DoesNotExist: # le username donnée n'existe pas -> Création depuis rien # (éventuellement en cours avec erreurs précédemment) pass if not user and not clipper: # connaît pas du tout, faut tout remplir if request: user_form = UserForm(request.POST) cof_form = CofForm(request.POST) else: user_form = UserForm() cof_form = CofForm() # mais on laisse le username en écriture cof_form.fields["login_clipper"].widget.attrs["readonly"] = True cof_form.fields["is_cof"].widget.attrs["disabled"] = True if request: account_form = AccountNoTriForm(request.POST) else: account_form = AccountNoTriForm() return {"account_form": account_form, "cof_form": cof_form, "user_form": user_form} @login_required @teamkfet_required def account_create_ajax(request, username=None, login_clipper=None, fullname=None): forms = get_account_create_forms( request=None, username=username, login_clipper=login_clipper, fullname=fullname ) return render( request, "kfet/account_create_form.html", { "account_form": forms["account_form"], "cof_form": forms["cof_form"], "user_form": forms["user_form"], }, ) # Account - Read @login_required def account_read(request, trigramme): account = get_object_or_404(Account, trigramme=trigramme) # Checking permissions if not account.readable or ( not request.user.has_perm("kfet.is_team") and request.user != account.user ): raise Http404 addcosts = ( OperationGroup.objects.filter(opes__addcost_for=account, opes__canceled_at=None) .extra({"date": "date(at)"}) .values("date") .annotate(sum_addcosts=Sum("opes__addcost_amount")) .order_by("-date") ) return render( request, "kfet/account_read.html", {"account": account, "addcosts": addcosts} ) # Account - Update @teamkfet_required @kfet_password_auth def account_update(request, trigramme): account = get_object_or_404(Account, trigramme=trigramme) # Checking permissions if not account.editable: # Plus de leak de trigramme ! return HttpResponseForbidden user_info_form = UserInfoForm(instance=account.user) account_form = AccountForm(instance=account) group_form = UserGroupForm(instance=account.user) frozen_form = AccountFrozenForm(request.POST, instance=account) pwd_form = AccountPwdForm() if request.method == "POST": self_update = request.user == account.user account_form = AccountForm(request.POST, instance=account) group_form = UserGroupForm(request.POST, instance=account.user) frozen_form = AccountFrozenForm(request.POST, instance=account) pwd_form = AccountPwdForm(request.POST, account=account) forms = [] warnings = [] if self_update or request.user.has_perm("kfet.change_account"): forms.append(account_form) elif account_form.has_changed(): warnings.append("compte") if request.user.has_perm("kfet.manage_perms"): forms.append(group_form) forms.append(frozen_form) elif group_form.has_changed(): warnings.append("statut d'équipe") # Il ne faut pas valider `pwd_form` si elle est inchangée if pwd_form.has_changed(): if self_update or request.user.has_perm("kfet.change_account_password"): forms.append(pwd_form) else: warnings.append("mot de passe") # Updating account info if forms == []: messages.error( request, "Informations non mises à jour : permission refusée", extra_tags="permission-denied", ) else: if all(form.is_valid() for form in forms): for form in forms: form.save() if len(warnings): messages.warning( request, "Permissions insuffisantes pour modifier" " les informations suivantes : {}.".format(", ".join(warnings)), ) if self_update: messages.success(request, "Vos informations ont été mises à jour !") else: messages.success( request, "Informations du compte %s mises à jour" % account.trigramme, ) return redirect("kfet.account.read", account.trigramme) else: messages.error( request, "Informations non mises à jour : corrigez les erreurs" ) return render( request, "kfet/account_update.html", { "user_info_form": user_info_form, "account": account, "account_form": account_form, "frozen_form": frozen_form, "group_form": group_form, "pwd_form": pwd_form, }, ) # Account - Delete class AccountDelete(PermissionRequiredMixin, DeleteView): model = Account slug_field = "trigramme" slug_url_kwarg = "trigramme" success_url = reverse_lazy("kfet.account") success_message = "Compte supprimé avec succès !" permission_required = "kfet.delete_account" http_method_names = ["post"] def delete(self, request, *args, **kwargs): self.object = self.get_object() if self.object.balance >= 0.01: messages.error( request, "Impossible de supprimer un compte " "avec une balance strictement positive !", ) return redirect("kfet.account.read", self.object.trigramme) if self.object.trigramme in [ "LIQ", KFET_GENERIC_TRIGRAMME, KFET_DELETED_TRIGRAMME, "#13", ]: messages.error(request, "Impossible de supprimer un trigramme protégé !") return redirect("kfet.account.read", self.object.trigramme) # SuccessMessageMixin does not work with DeleteView, see : # https://code.djangoproject.com/ticket/21926 messages.success(request, self.success_message) return super().delete(request, *args, **kwargs) class AccountNegativeList(ListView): queryset = ( AccountNegative.objects.select_related("account", "account__cofprofile__user") .filter(account__balance__lt=0) .exclude(account__trigramme="#13") ) template_name = "kfet/account_negative.html" context_object_name = "negatives" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) balances = (neg.account.balance for neg in self.object_list) context["negatives_sum"] = sum(balances) return context # ----- # Checkout views # ----- # Checkout - General class CheckoutList(ListView): model = Checkout template_name = "kfet/checkout.html" context_object_name = "checkouts" # Checkout - Create @method_decorator(kfet_password_auth, name="dispatch") class CheckoutCreate(SuccessMessageMixin, CreateView): model = Checkout template_name = "kfet/checkout_create.html" form_class = CheckoutForm success_message = "Nouvelle caisse : %(name)s" # Surcharge de la validation def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.add_checkout"): form.add_error( None, ValidationError("Permission refusée", code="permission-denied") ) return self.form_invalid(form) # Creating form.instance.created_by = self.request.user.profile.account_kfet form.save() return super().form_valid(form) # Checkout - Read class CheckoutRead(DetailView): model = Checkout template_name = "kfet/checkout_read.html" context_object_name = "checkout" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["statements"] = context["checkout"].statements.order_by("-at") return context # Checkout - Update @method_decorator(kfet_password_auth, name="dispatch") class CheckoutUpdate(SuccessMessageMixin, UpdateView): model = Checkout template_name = "kfet/checkout_update.html" form_class = CheckoutRestrictForm success_message = "Informations mises à jour pour la caisse : %(name)s" # Surcharge de la validation def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.change_checkout"): form.add_error( None, ValidationError("Permission refusée", code="permission-denied") ) return self.form_invalid(form) # Updating return super().form_valid(form) # ----- # Checkout Statement views # ----- # Checkout Statement - General class CheckoutStatementList(ListView): model = CheckoutStatement queryset = CheckoutStatement.objects.order_by("-at") template_name = "kfet/checkoutstatement.html" context_object_name = "checkoutstatements" # Checkout Statement - Create def getAmountTaken(data): return Decimal( data.taken_001 * 0.01 + data.taken_002 * 0.02 + data.taken_005 * 0.05 + data.taken_01 * 0.1 + data.taken_02 * 0.2 + data.taken_05 * 0.5 + data.taken_1 * 1 + data.taken_2 * 2 + data.taken_5 * 5 + data.taken_10 * 10 + data.taken_20 * 20 + data.taken_50 * 50 + data.taken_100 * 100 + data.taken_200 * 200 + data.taken_500 * 500 + float(data.taken_cheque) ) def getAmountBalance(data): return Decimal( data["balance_001"] * 0.01 + data["balance_002"] * 0.02 + data["balance_005"] * 0.05 + data["balance_01"] * 0.1 + data["balance_02"] * 0.2 + data["balance_05"] * 0.5 + data["balance_1"] * 1 + data["balance_2"] * 2 + data["balance_5"] * 5 + data["balance_10"] * 10 + data["balance_20"] * 20 + data["balance_50"] * 50 + data["balance_100"] * 100 + data["balance_200"] * 200 + data["balance_500"] * 500 ) @method_decorator(kfet_password_auth, name="dispatch") class CheckoutStatementCreate(SuccessMessageMixin, CreateView): model = CheckoutStatement template_name = "kfet/checkoutstatement_create.html" form_class = CheckoutStatementCreateForm success_message = "Nouveau relevé : %(checkout)s - %(at)s" def get_success_url(self): return reverse_lazy( "kfet.checkout.read", kwargs={"pk": self.kwargs["pk_checkout"]} ) def get_success_message(self, cleaned_data): return self.success_message % dict( cleaned_data, checkout=self.object.checkout.name, at=self.object.at ) def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) checkout = Checkout.objects.get(pk=self.kwargs["pk_checkout"]) context["checkout"] = checkout return context def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.add_checkoutstatement"): form.add_error( None, ValidationError("Permission refusée", code="permission-denied") ) return self.form_invalid(form) # Creating form.instance.amount_taken = getAmountTaken(form.instance) if not form.instance.not_count: form.instance.balance_new = getAmountBalance(form.cleaned_data) form.instance.checkout_id = self.kwargs["pk_checkout"] form.instance.by = self.request.user.profile.account_kfet return super().form_valid(form) @method_decorator(kfet_password_auth, name="dispatch") class CheckoutStatementUpdate(SuccessMessageMixin, UpdateView): model = CheckoutStatement template_name = "kfet/checkoutstatement_update.html" form_class = CheckoutStatementUpdateForm success_message = "Relevé modifié" def get_success_url(self): return reverse_lazy( "kfet.checkout.read", kwargs={"pk": self.kwargs["pk_checkout"]} ) def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) checkout = Checkout.objects.get(pk=self.kwargs["pk_checkout"]) context["checkout"] = checkout return context def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.change_checkoutstatement"): form.add_error( None, ValidationError("Permission refusée", code="permission-denied") ) return self.form_invalid(form) # Updating form.instance.amount_taken = getAmountTaken(form.instance) return super().form_valid(form) # ----- # Category views # ----- # Category - General class CategoryList(ListView): queryset = ArticleCategory.objects.prefetch_related("articles").order_by("name") template_name = "kfet/category.html" context_object_name = "categories" # Category - Update @method_decorator(kfet_password_auth, name="dispatch") class CategoryUpdate(SuccessMessageMixin, UpdateView): model = ArticleCategory template_name = "kfet/category_update.html" form_class = CategoryForm success_url = reverse_lazy("kfet.category") success_message = "Informations mises à jour pour la catégorie : %(name)s" # Surcharge de la validation def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.change_articlecategory"): form.add_error( None, ValidationError("Permission refusée", code="permission-denied") ) return self.form_invalid(form) # Updating return super().form_valid(form) # ----- # Article views # ----- # Article - General class ArticleList(ListView): queryset = ( Article.objects.select_related("category") .prefetch_related( Prefetch( "inventories", queryset=Inventory.objects.order_by("-at"), to_attr="inventory", ) ) .order_by("category__name", "-is_sold", "name") ) template_name = "kfet/article.html" context_object_name = "articles" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) articles = context[self.context_object_name] context["nb_articles"] = len(articles) context[self.context_object_name] = articles.filter(is_sold=True) context["not_sold_articles"] = articles.filter(is_sold=False) return context # Article - Create @method_decorator(kfet_password_auth, name="dispatch") class ArticleCreate(SuccessMessageMixin, CreateView): model = Article template_name = "kfet/article_create.html" form_class = ArticleForm success_message = "Nouvel item : %(category)s - %(name)s" # Surcharge de la validation def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.add_article"): form.add_error( None, ValidationError("Permission refusée", code="permission-denied") ) return self.form_invalid(form) # Save ici pour save le manytomany suppliers article = form.save() # Save des suppliers déjà existant for supplier in form.cleaned_data["suppliers"]: SupplierArticle.objects.create(article=article, supplier=supplier) # Nouveau supplier supplier_new = form.cleaned_data["supplier_new"].strip() if supplier_new: supplier, created = Supplier.objects.get_or_create(name=supplier_new) if created: SupplierArticle.objects.create(article=article, supplier=supplier) # Inventaire avec stock initial inventory = Inventory() inventory.by = self.request.user.profile.account_kfet inventory.save() InventoryArticle.objects.create( inventory=inventory, article=article, stock_old=article.stock, stock_new=article.stock, ) # Creating return super().form_valid(form) # Article - Read class ArticleRead(DetailView): model = Article template_name = "kfet/article_read.html" context_object_name = "article" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) inventoryarts = ( InventoryArticle.objects.filter(article=self.object) .select_related("inventory") .order_by("-inventory__at") ) context["inventoryarts"] = inventoryarts supplierarts = ( SupplierArticle.objects.filter(article=self.object) .select_related("supplier") .order_by("-at") ) context["supplierarts"] = supplierarts return context # Article - Update @method_decorator(kfet_password_auth, name="dispatch") class ArticleUpdate(SuccessMessageMixin, UpdateView): model = Article template_name = "kfet/article_update.html" form_class = ArticleRestrictForm success_message = "Informations mises à jour pour l'article : %(name)s" # Surcharge de la validation def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.change_article"): form.add_error( None, ValidationError("Permission refusée", code="permission-denied") ) return self.form_invalid(form) # Save ici pour save le manytomany suppliers article = form.save() # Save des suppliers déjà existant for supplier in form.cleaned_data["suppliers"]: if supplier not in article.suppliers.all(): SupplierArticle.objects.create(article=article, supplier=supplier) # On vire les suppliers désélectionnés for supplier in article.suppliers.all(): if supplier not in form.cleaned_data["suppliers"]: SupplierArticle.objects.filter( article=article, supplier=supplier ).delete() # Nouveau supplier supplier_new = form.cleaned_data["supplier_new"].strip() if supplier_new: supplier, created = Supplier.objects.get_or_create(name=supplier_new) if created: SupplierArticle.objects.create(article=article, supplier=supplier) # Updating return super().form_valid(form) class ArticleDelete(PermissionRequiredMixin, DeleteView): model = Article success_url = reverse_lazy("kfet.article") success_message = "Article supprimé avec succès !" permission_required = "kfet.delete_article" def get(self, request, *args, **kwargs): return redirect("kfet.article.read", self.kwargs.get(self.pk_url_kwarg)) def delete(self, request, *args, **kwargs): messages.success(request, self.success_message) return super().delete(request, *args, **kwargs) # ----- # K-Psul # ----- @teamkfet_required def kpsul(request): data = {} data["operationgroup_form"] = KPsulOperationGroupForm() data["trigramme_form"] = KPsulAccountForm() data["checkout_form"] = KPsulCheckoutForm() data["operation_formset"] = KPsulOperationFormSet(queryset=Operation.objects.none()) return render(request, "kfet/kpsul.html", data) @teamkfet_required def kpsul_get_settings(request): addcost_for = kfet_config.addcost_for data = { "subvention_cof": kfet_config.subvention_cof, "addcost_for": addcost_for and addcost_for.trigramme or "", "addcost_amount": kfet_config.addcost_amount, } return JsonResponse(data) @teamkfet_required def account_read_json(request, trigramme): account = get_object_or_404(Account, trigramme=trigramme) if not account.readable: raise Http404 data = { "id": account.pk, "name": account.name, "email": account.email, "is_cof": account.is_cof, "promo": account.promo, "balance": account.balance, "is_frozen": account.is_frozen, "departement": account.departement, "nickname": account.nickname, "trigramme": account.trigramme, } return JsonResponse(data) @teamkfet_required def kpsul_checkout_data(request): pk = request.POST.get("pk", 0) if not pk: pk = 0 data = ( Checkout.objects.annotate( last_statement_by_first_name=F( "statements__by__cofprofile__user__first_name" ), last_statement_by_last_name=F( "statements__by__cofprofile__user__last_name" ), last_statement_by_trigramme=F("statements__by__trigramme"), last_statement_balance=F("statements__balance_new"), last_statement_at=F("statements__at"), ) .select_related( "statements" "statements__by", "statements__by__cofprofile__user" ) .filter(pk=pk) .order_by("statements__at") .values( "id", "name", "balance", "valid_from", "valid_to", "last_statement_balance", "last_statement_at", "last_statement_by_trigramme", "last_statement_by_last_name", "last_statement_by_first_name", ) .last() ) if data is None: raise Http404 return JsonResponse(data) @teamkfet_required @kfet_password_auth def kpsul_update_addcost(request): addcost_form = AddcostForm(request.POST) data = {"errors": []} if not addcost_form.is_valid(): for field, errors in addcost_form.errors.items(): for error in errors: data["errors"].append({"code": f"invalid_{field}", "message": error}) return JsonResponse(data, status=400) required_perms = ["kfet.manage_addcosts"] if not request.user.has_perms(required_perms): data["missing_perms"] = get_missing_perms(required_perms, request.user) return JsonResponse(data, status=403) trigramme = addcost_form.cleaned_data["trigramme"] account = trigramme and Account.objects.get(trigramme=trigramme) or None amount = addcost_form.cleaned_data["amount"] kfet_config.set(addcost_for=account, addcost_amount=amount) data = { "addcost": {"for": account and account.trigramme or None, "amount": amount}, "type": "kpsul", } channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)("kfet.kpsul", data) return JsonResponse(data) def get_missing_perms(required_perms: List[str], user: User) -> List[str]: def get_perm_name(app_label: str, codename: str) -> str: return Permission.objects.values_list("name", flat=True).get( codename=codename, content_type__app_label=app_label ) missing_perms = [ get_perm_name(*perm.split(".")) for perm in required_perms if not user.has_perm(perm) ] return missing_perms @teamkfet_required @kfet_password_auth def kpsul_perform_operations(request): # Initializing response data data = {"errors": []} # Checking operationgroup operationgroup_form = KPsulOperationGroupForm(request.POST) if not operationgroup_form.is_valid(): for field in operationgroup_form.errors: verbose_field, feminin = ( ("compte", "") if field == "on_acc" else ("caisse", "e") ) data["errors"].append( { "code": f"invalid_{field}", "message": f"Pas de {verbose_field} sélectionné{feminin}", } ) # Checking operation_formset operation_formset = KPsulOperationFormSet(request.POST) if not operation_formset.is_valid(): data["errors"].append( { "code": "invalid_formset", "message": "Formulaire d'opérations vide ou invalide", } ) # Returning BAD REQUEST if errors if data["errors"]: return JsonResponse(data, status=400) # Pre-saving (no commit) operationgroup = operationgroup_form.save(commit=False) operations = operation_formset.save(commit=False) on_acc = operationgroup.on_acc # Retrieving COF grant cof_grant = kfet_config.subvention_cof # Retrieving addcosts data addcost_amount = kfet_config.addcost_amount addcost_for = kfet_config.addcost_for # Initializing vars required_perms = set() # Required perms to perform all operations cof_grant_divisor = 1 + cof_grant / 100 to_addcost_for_balance = 0 # For balance of addcost_for to_checkout_balance = 0 # For balance of selected checkout to_articles_stocks = defaultdict(lambda: 0) # For stocks articles is_addcost = all((addcost_for, addcost_amount, addcost_for != on_acc)) need_comment = on_acc.need_comment if on_acc.is_frozen: data["errors"].append( {"code": "frozen_acc", "message": f"Le compte {on_acc.trigramme} est gelé"} ) # Filling data of each operations # + operationgroup + calculating other stuffs for operation in operations: if operation.type == Operation.PURCHASE: operation.amount = -operation.article.price * operation.article_nb if is_addcost & operation.article.category.has_addcost: operation.addcost_for = addcost_for operation.addcost_amount = addcost_amount * operation.article_nb operation.amount -= operation.addcost_amount to_addcost_for_balance += operation.addcost_amount if on_acc.is_cash: to_checkout_balance += -operation.amount if on_acc.is_cof and operation.article.category.has_reduction: if is_addcost and operation.article.category.has_addcost: operation.addcost_amount /= cof_grant_divisor operation.amount = operation.amount / cof_grant_divisor to_articles_stocks[operation.article] -= operation.article_nb else: if on_acc.is_cash: data["errors"].append( { "code": "invalid_liq", "message": ( "Impossible de compter autre chose que des achats sur LIQ" ), } ) if operation.type != Operation.EDIT: to_checkout_balance += operation.amount operationgroup.amount += operation.amount if operation.type == Operation.DEPOSIT: required_perms.add("kfet.perform_deposit") if operation.type == Operation.EDIT: required_perms.add("kfet.edit_balance_account") need_comment = True if on_acc.is_cof: to_addcost_for_balance = to_addcost_for_balance / cof_grant_divisor (perms, stop) = on_acc.perms_to_perform_operation(amount=operationgroup.amount) required_perms |= perms if stop: data["errors"].append( { "code": "negative", "message": f"Le compte {on_acc.trigramme} a un solde insuffisant.", } ) if need_comment: operationgroup.comment = operationgroup.comment.strip() if not operationgroup.comment: data["need_comment"] = True if data["errors"] or "need_comment" in data: return JsonResponse(data, status=400) if not request.user.has_perms(required_perms): data["missing_perms"] = get_missing_perms(required_perms, request.user) return JsonResponse(data, status=403) # If 1 perm is required, filling who perform the operations if required_perms: operationgroup.valid_by = request.user.profile.account_kfet # Filling cof status for statistics operationgroup.is_cof = on_acc.is_cof # Starting transaction to ensure data consistency with transaction.atomic(): # If not cash account, # saving account's balance and adding to Negative if not in if not on_acc.is_cash: ( Account.objects.filter(pk=on_acc.pk).update( balance=F("balance") + operationgroup.amount ) ) on_acc.refresh_from_db() on_acc.update_negative() # Updating checkout's balance if to_checkout_balance: Checkout.objects.filter(pk=operationgroup.checkout.pk).update( balance=F("balance") + to_checkout_balance ) # Saving addcost_for with new balance if there is one if is_addcost and to_addcost_for_balance: Account.objects.filter(pk=addcost_for.pk).update( balance=F("balance") + to_addcost_for_balance ) # Saving operation group operationgroup.save() # Filling operationgroup id for each operations and saving for operation in operations: operation.group = operationgroup operation.save() # Updating articles stock for article in to_articles_stocks: Article.objects.filter(pk=article.pk).update( stock=F("stock") + to_articles_stocks[article] ) # Websocket data websocket_data = {"type": "kpsul"} websocket_data["groups"] = [ { "add": True, "type": "operation", "id": operationgroup.pk, "amount": operationgroup.amount, "checkout__name": operationgroup.checkout.name, "at": operationgroup.at, "is_cof": operationgroup.is_cof, "comment": operationgroup.comment, "valid_by__trigramme": ( operationgroup.valid_by and operationgroup.valid_by.trigramme or None ), "on_acc__trigramme": on_acc.trigramme, "entries": [], } ] for operation in operations: ope_data = { "id": operation.pk, "type": operation.type, "amount": operation.amount, "addcost_amount": operation.addcost_amount, "addcost_for__trigramme": ( operation.addcost_for and addcost_for.trigramme or None ), "article__name": (operation.article and operation.article.name or None), "article_nb": operation.article_nb, "group_id": operationgroup.pk, "canceled_by__trigramme": None, "canceled_at": None, } websocket_data["groups"][0]["entries"].append(ope_data) # Need refresh from db cause we used update on queryset operationgroup.checkout.refresh_from_db() websocket_data["checkouts"] = [ {"id": operationgroup.checkout.pk, "balance": operationgroup.checkout.balance} ] websocket_data["articles"] = [] # Need refresh from db cause we used update on querysets articles_pk = [article.pk for article in to_articles_stocks] articles = Article.objects.values("id", "stock").filter(pk__in=articles_pk) for article in articles: websocket_data["articles"].append( {"id": article["id"], "stock": article["stock"]} ) channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)("kfet.kpsul", websocket_data) return JsonResponse(data) @teamkfet_required @kfet_password_auth def cancel_operations(request): # Pour la réponse data = {"canceled": [], "warnings": {}, "errors": []} # Checking if BAD REQUEST (opes_pk not int or not existing) try: # Set pour virer les doublons opes_post = set( map(int, filter(None, request.POST.getlist("operations[]", []))) ) except ValueError: data["errors"].append( {"code": "invalid_request", "message": "Requête invalide !"} ) return JsonResponse(data, status=400) opes_all = Operation.objects.select_related( "group", "group__on_acc", "group__on_acc__negative" ).filter(pk__in=opes_post) opes_pk = [ope.pk for ope in opes_all] opes_notexisting = [ope for ope in opes_post if ope not in opes_pk] if opes_notexisting: data["errors"].append( { "code": "cancel_missing", "message": "Opérations inexistantes : {}".format( ", ".join(map(str, opes_notexisting)) ), } ) return JsonResponse(data, status=400) opes_already_canceled = [] # Déjà annulée opes = [] # Pas déjà annulée required_perms = set() cancel_duration = kfet_config.cancel_duration # Modifs à faire sur les balances des comptes to_accounts_balances = defaultdict(int) # ------ sur les montants des groupes d'opé to_groups_amounts = defaultdict(int) # ------ sur les balances de caisses to_checkouts_balances = defaultdict(int) # ------ sur les stocks d'articles to_articles_stocks = defaultdict(int) for ope in opes_all: if ope.canceled_at: # Opération déjà annulée, va pour un warning en Response opes_already_canceled.append(ope.pk) else: opes.append(ope.pk) # Si opé il y a plus de CANCEL_DURATION, permission requise if ope.group.at + cancel_duration < timezone.now(): required_perms.add("kfet.cancel_old_operations") # Calcul de toutes modifs à faire en cas de validation # Pour les balances de comptes if not ope.group.on_acc.is_cash: to_accounts_balances[ope.group.on_acc] -= ope.amount if ope.addcost_for and ope.addcost_amount: to_accounts_balances[ope.addcost_for] -= ope.addcost_amount # Pour les groupes d'opés to_groups_amounts[ope.group] -= ope.amount # Pour les balances de caisses # Les balances de caisses dont il y a eu un relevé depuis la date # de la commande ne doivent pas être modifiées # TODO : Prendre en compte le dernier relevé où la caisse a été # comptée et donc modifier les balance_old (et amount_error) # des relevés suivants. # Note : Dans le cas où un CheckoutStatement est mis à jour # par `.save()`, amount_error est recalculé automatiquement, # ce qui n'est pas le cas en faisant un update sur queryset # TODO ? : Maj les balance_old de relevés pour modifier l'erreur last_statement = ( CheckoutStatement.objects.filter(checkout=ope.group.checkout) .order_by("at") .last() ) if not last_statement or last_statement.at < ope.group.at: if ope.is_checkout: if ope.group.on_acc.is_cash: to_checkouts_balances[ope.group.checkout] -= -ope.amount else: to_checkouts_balances[ope.group.checkout] -= ope.amount # Pour les stocks d'articles # Les stocks d'articles dont il y a eu un inventaire depuis la date # de la commande ne doivent pas être modifiés # TODO : Prendre en compte le dernier inventaire où le stock a bien # été compté (pas dans le cas d'une livraison). # Note : si InventoryArticle est maj par .save(), stock_error # est recalculé automatiquement if ope.article and ope.article_nb: last_stock = ( InventoryArticle.objects.select_related("inventory") .filter(article=ope.article) .order_by("inventory__at") .last() ) if not last_stock or last_stock.inventory.at < ope.group.at: to_articles_stocks[ope.article] += ope.article_nb if not opes: data["warnings"]["already_canceled"] = opes_already_canceled return JsonResponse(data) negative_accounts = [] # Checking permissions or stop for account in to_accounts_balances: (perms, stop) = account.perms_to_perform_operation( amount=to_accounts_balances[account] ) required_perms |= perms if stop: negative_accounts.append(account.trigramme) if negative_accounts: data["errors"].append( { "code": "negative", "message": "Solde insuffisant pour les comptes suivants : {}".format( ", ".join(negative_accounts) ), } ) return JsonResponse(data, status=400) if not request.user.has_perms(required_perms): data["missing_perms"] = get_missing_perms(required_perms, request.user) return JsonResponse(data, status=403) canceled_by = required_perms and request.user.profile.account_kfet or None canceled_at = timezone.now() with transaction.atomic(): ( Operation.objects.filter(pk__in=opes).update( canceled_by=canceled_by, canceled_at=canceled_at ) ) for account in to_accounts_balances: ( Account.objects.filter(pk=account.pk).update( balance=F("balance") + to_accounts_balances[account] ) ) if not account.is_cash: # Should always be true, but we want to be sure account.refresh_from_db() account.update_negative() for checkout in to_checkouts_balances: Checkout.objects.filter(pk=checkout.pk).update( balance=F("balance") + to_checkouts_balances[checkout] ) for group in to_groups_amounts: OperationGroup.objects.filter(pk=group.pk).update( amount=F("amount") + to_groups_amounts[group] ) for article in to_articles_stocks: Article.objects.filter(pk=article.pk).update( stock=F("stock") + to_articles_stocks[article] ) # Need refresh from db cause we used update on querysets. # Sort objects by pk to get deterministic responses. opegroups_pk = [opegroup.pk for opegroup in to_groups_amounts] opegroups = ( OperationGroup.objects.values("id", "amount", "is_cof") .filter(pk__in=opegroups_pk) .order_by("pk") ) opes = ( Operation.objects.values("id", "canceled_at", "canceled_by__trigramme") .filter(pk__in=opes) .order_by("pk") ) checkouts_pk = [checkout.pk for checkout in to_checkouts_balances] checkouts = ( Checkout.objects.values("id", "balance") .filter(pk__in=checkouts_pk) .order_by("pk") ) articles_pk = [article.pk for articles in to_articles_stocks] articles = Article.objects.values("id", "stock").filter(pk__in=articles_pk) # Websocket data websocket_data = {"checkouts": [], "articles": [], "type": "kpsul"} for checkout in checkouts: websocket_data["checkouts"].append( {"id": checkout["id"], "balance": checkout["balance"]} ) for article in articles: websocket_data["articles"].append( {"id": article["id"], "stock": article["stock"]} ) channel_layer = get_channel_layer() async_to_sync(channel_layer.group_send)("kfet.kpsul", websocket_data) data["canceled"] = list(opes) data["opegroups_to_update"] = list(opegroups) if opes_already_canceled: data["warnings"]["already_canceled"] = opes_already_canceled return JsonResponse(data) def get_history_limit(user) -> Tuple[datetime, datetime]: """returns a tuple of 2 dates - the earliest date the given user can view history of any account - the earliest date the given user can view history of special accounts (LIQ and #13)""" now = timezone.now() if user.has_perm("kfet.access_old_history"): return ( now - settings.KFET_HISTORY_LONG_DATE_LIMIT, settings.KFET_HISTORY_NO_DATE_LIMIT, ) if user.has_perm("kfet.is_team"): limit = now - settings.KFET_HISTORY_DATE_LIMIT return limit, limit # should not happen - future earliest date future = now + timedelta(days=1) return future, future @login_required def history_json(request): # Récupération des paramètres form = FilterHistoryForm(request.GET) if not form.is_valid(): return HttpResponseBadRequest() start = form.cleaned_data["start"] end = form.cleaned_data["end"] account = form.cleaned_data["account"] checkout = form.cleaned_data["checkout"] transfers_only = form.cleaned_data["transfers_only"] opes_only = form.cleaned_data["opes_only"] # Construction de la requête (sur les transferts) pour le prefetch transfer_queryset_prefetch = Transfer.objects.select_related( "from_acc", "to_acc", "canceled_by" ) # Le check sur les comptes est dans le prefetch pour les transferts if account: transfer_queryset_prefetch = transfer_queryset_prefetch.filter( Q(from_acc=account) | Q(to_acc=account) ) if not request.user.has_perm("kfet.is_team"): try: acc = request.user.profile.account_kfet transfer_queryset_prefetch = transfer_queryset_prefetch.filter( Q(from_acc=acc) | Q(to_acc=acc) ) except Account.DoesNotExist: return JsonResponse({}, status=403) transfer_prefetch = Prefetch( "transfers", queryset=transfer_queryset_prefetch, to_attr="filtered_transfers" ) # Construction de la requête (sur les opérations) pour le prefetch ope_queryset_prefetch = Operation.objects.select_related( "article", "canceled_by", "addcost_for" ) ope_prefetch = Prefetch("opes", queryset=ope_queryset_prefetch) # Construction de la requête principale opegroups = ( OperationGroup.objects.prefetch_related(ope_prefetch) .select_related("on_acc", "valid_by") .order_by("at") ) transfergroups = ( TransferGroup.objects.prefetch_related(transfer_prefetch) .select_related("valid_by") .order_by("at") ) # limite l'accès à l'historique plus vieux que settings.KFET_HISTORY_DATE_LIMIT limit_date = True # Application des filtres if start: opegroups = opegroups.filter(at__gte=start) transfergroups = transfergroups.filter(at__gte=start) if end: opegroups = opegroups.filter(at__lt=end) transfergroups = transfergroups.filter(at__lt=end) if checkout: opegroups = opegroups.filter(checkout=checkout) transfergroups = TransferGroup.objects.none() if transfers_only: opegroups = OperationGroup.objects.none() if opes_only: transfergroups = TransferGroup.objects.none() if account: opegroups = opegroups.filter(on_acc=account) if account.user == request.user: limit_date = False # pas de limite de date sur son propre historique # Un non-membre de l'équipe n'a que accès à son historique elif not request.user.has_perm("kfet.is_team"): # un non membre de la kfet doit avoir le champ account # pré-rempli, cette requête est douteuse return JsonResponse({}, status=403) if limit_date: # limiter l'accès à l'historique ancien pour confidentialité earliest_date, earliest_date_no_limit = get_history_limit(request.user) if ( account and account.trigramme in settings.KFET_HISTORY_NO_DATE_LIMIT_TRIGRAMMES ): earliest_date = earliest_date_no_limit opegroups = opegroups.filter(at__gte=earliest_date) transfergroups = transfergroups.filter(at__gte=earliest_date) # Construction de la réponse history_groups = [] for opegroup in opegroups: opegroup_dict = { "type": "operation", "id": opegroup.id, "amount": opegroup.amount, "at": opegroup.at, "checkout_id": opegroup.checkout_id, "is_cof": opegroup.is_cof, "comment": opegroup.comment, "entries": [], "on_acc__trigramme": opegroup.on_acc and opegroup.on_acc.trigramme or None, } if request.user.has_perm("kfet.is_team"): opegroup_dict["valid_by__trigramme"] = ( opegroup.valid_by and opegroup.valid_by.trigramme or None ) for ope in opegroup.opes.all(): ope_dict = { "id": ope.id, "type": ope.type, "amount": ope.amount, "article_nb": ope.article_nb, "addcost_amount": ope.addcost_amount, "canceled_at": ope.canceled_at, "article__name": ope.article and ope.article.name or None, "addcost_for__trigramme": ope.addcost_for and ope.addcost_for.trigramme or None, } if request.user.has_perm("kfet.is_team"): ope_dict["canceled_by__trigramme"] = ( ope.canceled_by and ope.canceled_by.trigramme or None ) opegroup_dict["entries"].append(ope_dict) history_groups.append(opegroup_dict) for transfergroup in transfergroups: if transfergroup.filtered_transfers: transfergroup_dict = { "type": "transfer", "id": transfergroup.id, "at": transfergroup.at, "comment": transfergroup.comment, "entries": [], } if request.user.has_perm("kfet.is_team"): transfergroup_dict["valid_by__trigramme"] = ( transfergroup.valid_by and transfergroup.valid_by.trigramme or None ) for transfer in transfergroup.filtered_transfers: transfer_dict = { "id": transfer.id, "amount": transfer.amount, "canceled_at": transfer.canceled_at, "from_acc": transfer.from_acc.trigramme, "to_acc": transfer.to_acc.trigramme, } if request.user.has_perm("kfet.is_team"): transfer_dict["canceled_by__trigramme"] = ( transfer.canceled_by and transfer.canceled_by.trigramme or None ) transfergroup_dict["entries"].append(transfer_dict) history_groups.append(transfergroup_dict) history_groups.sort(key=lambda group: group["at"]) return JsonResponse({"groups": history_groups}) @teamkfet_required def kpsul_articles_data(request): articles = Article.objects.values( "id", "name", "price", "stock", "category_id", "category__name", "category__has_addcost", "category__has_reduction", ).filter(is_sold=True) return JsonResponse({"articles": list(articles)}) @teamkfet_required def history(request): # These limits are only useful for JS datepickers # They don't enforce anything and can be bypassed # Serious checks are done in history_json history_limit, history_no_limit = get_history_limit(request.user) history_no_limit_account_ids = Account.objects.filter( trigramme__in=settings.KFET_HISTORY_NO_DATE_LIMIT_TRIGRAMMES ).values_list("id", flat=True) format_date = lambda date: date.strftime("%Y-%m-%d %H:%M") data = { "filter_form": FilterHistoryForm(), "history_limit": format_date(history_limit), "history_no_limit_account_ids": history_no_limit_account_ids, "history_no_limit": format_date(history_no_limit), } return render(request, "kfet/history.html", data) # ----- # Settings views # ----- class SettingsList(TemplateView): template_name = "kfet/settings.html" config_list = permission_required("kfet.see_config")(SettingsList.as_view()) @method_decorator(kfet_password_auth, name="dispatch") class SettingsUpdate(SuccessMessageMixin, FormView): form_class = KFetConfigForm template_name = "kfet/settings_update.html" success_message = "Paramètres mis à jour" success_url = reverse_lazy("kfet.settings") def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.change_config"): form.add_error( None, ValidationError("Permission refusée", code="permission-denied") ) return self.form_invalid(form) form.save() return super().form_valid(form) config_update = permission_required("kfet.change_config")(SettingsUpdate.as_view()) # ----- # Transfer views # ----- @method_decorator(teamkfet_required, name="dispatch") class TransferView(TemplateView): template_name = "kfet/transfers.html" @teamkfet_required def transfers_create(request): transfer_formset = TransferFormSet(queryset=Transfer.objects.none()) return render( request, "kfet/transfers_create.html", {"transfer_formset": transfer_formset} ) @teamkfet_required @kfet_password_auth def perform_transfers(request): data = {"errors": []} # Checking transfer_formset transfer_formset = TransferFormSet(request.POST) try: if not transfer_formset.is_valid(): for form_errors in transfer_formset.errors: for field, errors in form_errors.items(): if field == "amount": for error in errors: data["errors"].append({"code": "amount", "message": error}) else: # C'est compliqué de trouver le compte qui pose problème... acc_error = True if acc_error: data["errors"].append( { "code": "invalid_acc", "message": "L'un des comptes est invalide ou manquant", } ) return JsonResponse(data, status=400) except ValidationError: data["errors"].append( {"code": "invalid_request", "message": "Requête invalide"} ) return JsonResponse(data, status=400) transfers = transfer_formset.save(commit=False) # Initializing vars required_perms = set( ["kfet.add_transfer"] ) # Required perms to perform all transfers to_accounts_balances = defaultdict(int) # For balances of accounts for transfer in transfers: to_accounts_balances[transfer.from_acc] -= transfer.amount to_accounts_balances[transfer.to_acc] += transfer.amount negative_accounts = [] # Checking if ok on all accounts frozen = set() for account in to_accounts_balances: if account.is_frozen: frozen.add(account.trigramme) (perms, stop) = account.perms_to_perform_operation( amount=to_accounts_balances[account] ) required_perms |= perms if stop: negative_accounts.append(account.trigramme) if frozen: data["errors"].append( { "code": "frozen", "message": "Les comptes suivants sont gelés : {}".format( ", ".join(frozen) ), } ) if negative_accounts: data["errors"].append( { "code": "negative", "message": "Solde insuffisant pour les comptes suivants : {}".format( ", ".join(negative_accounts) ), } ) if data["errors"]: return JsonResponse(data, status=400) if not request.user.has_perms(required_perms): data["missing_perms"] = get_missing_perms(required_perms, request.user) return JsonResponse(data, status=403) # Creating transfer group transfergroup = TransferGroup() if required_perms: transfergroup.valid_by = request.user.profile.account_kfet comment = request.POST.get("comment", "") transfergroup.comment = comment.strip() with transaction.atomic(): # Updating balances accounts for account in to_accounts_balances: Account.objects.filter(pk=account.pk).update( balance=F("balance") + to_accounts_balances[account] ) account.refresh_from_db() account.update_negative() # Saving transfer group transfergroup.save() # Saving all transfers with group for transfer in transfers: transfer.group = transfergroup transfer.save() return JsonResponse({}) @teamkfet_required @kfet_password_auth def cancel_transfers(request): # Pour la réponse data = {"canceled": [], "warnings": {}, "errors": []} # Checking if BAD REQUEST (transfers_pk not int or not existing) try: # Set pour virer les doublons transfers_post = set( map(int, filter(None, request.POST.getlist("transfers[]", []))) ) except ValueError: data["errors"].append( {"code": "invalid_request", "message": "Requête invalide !"} ) return JsonResponse(data, status=400) transfers_all = Transfer.objects.select_related( "group", "from_acc", "from_acc__negative", "to_acc", "to_acc__negative" ).filter(pk__in=transfers_post) transfers_pk = [transfer.pk for transfer in transfers_all] transfers_notexisting = [ transfer for transfer in transfers_post if transfer not in transfers_pk ] if transfers_notexisting: data["errors"].append( { "code": "cancel_missing", "message": "Transferts inexistants : {}".format( ", ".join(map(str, transfers_notexisting)) ), } ) return JsonResponse(data, status=400) transfers_already_canceled = [] # Déjà annulés transfers = [] # Pas déjà annulés required_perms = set() cancel_duration = kfet_config.cancel_duration # Modifs à faire sur les balances des comptes to_accounts_balances = defaultdict(int) for transfer in transfers_all: if transfer.canceled_at: # Transfert déjà annulé, va pour un warning en Response transfers_already_canceled.append(transfer.pk) else: transfers.append(transfer.pk) # Si transfer il y a plus de CANCEL_DURATION, permission requise if transfer.group.at + cancel_duration < timezone.now(): required_perms.add("kfet.cancel_old_operations") # Calcul de toutes modifs à faire en cas de validation # Pour les balances de comptes to_accounts_balances[transfer.from_acc] += transfer.amount to_accounts_balances[transfer.to_acc] += -transfer.amount if not transfers: data["warnings"]["already_canceled"] = transfers_already_canceled return JsonResponse(data) negative_accounts = [] # Checking permissions or stop for account in to_accounts_balances: (perms, stop) = account.perms_to_perform_operation( amount=to_accounts_balances[account] ) required_perms |= perms if stop: negative_accounts.append(account.trigramme) if negative_accounts: data["errors"].append( { "code": "negative", "message": "Solde insuffisant pour les comptes suivants : {}".format( ", ".join(negative_accounts) ), } ) return JsonResponse(data, status=400) if not request.user.has_perms(required_perms): data["missing_perms"] = get_missing_perms(required_perms, request.user) return JsonResponse(data, status=403) canceled_by = required_perms and request.user.profile.account_kfet or None canceled_at = timezone.now() with transaction.atomic(): ( Transfer.objects.filter(pk__in=transfers).update( canceled_by=canceled_by, canceled_at=canceled_at ) ) for account in to_accounts_balances: Account.objects.filter(pk=account.pk).update( balance=F("balance") + to_accounts_balances[account] ) account.refresh_from_db() account.update_negative() transfers = ( Transfer.objects.values("id", "canceled_at", "canceled_by__trigramme") .filter(pk__in=transfers) .order_by("pk") ) data["canceled"] = list(transfers) if transfers_already_canceled: data["warnings"]["already_canceled"] = transfers_already_canceled return JsonResponse(data) class InventoryList(ListView): queryset = ( Inventory.objects.select_related("by", "order") .annotate(nb_articles=Count("articles")) .order_by("-at") ) template_name = "kfet/inventory.html" context_object_name = "inventories" @teamkfet_required @kfet_password_auth def inventory_create(request): articles = Article.objects.select_related("category").order_by( "-is_sold", "category__name", "name" ) initial = [] for article in articles: initial.append( { "is_sold": article.is_sold, "article": article.pk, "stock_old": article.stock, "name": article.name, "category": article.category_id, "category__name": article.category.name, "box_capacity": article.box_capacity or 0, } ) cls_formset = formset_factory(form=InventoryArticleForm, extra=0) if request.POST: formset = cls_formset(request.POST, initial=initial) if not request.user.has_perm("kfet.add_inventory"): messages.error( request, "Permission refusée", extra_tags="permission-denied" ) elif formset.is_valid(): with transaction.atomic(): articles = Article.objects.select_for_update() inventory = Inventory() inventory.by = request.user.profile.account_kfet saved = False for form in formset: if form.cleaned_data["stock_new"] is not None: if not saved: inventory.save() saved = True article = articles.get(pk=form.cleaned_data["article"].pk) stock_old = article.stock stock_new = form.cleaned_data["stock_new"] InventoryArticle.objects.create( inventory=inventory, article=article, stock_old=stock_old, stock_new=stock_new, ) article.stock = stock_new article.save() if saved: messages.success(request, "Inventaire créé") return redirect("kfet.inventory") messages.warning(request, "Bah alors ? On a rien compté ?") else: messages.error(request, "Pas marché") else: formset = cls_formset(initial=initial) return render(request, "kfet/inventory_create.html", {"formset": formset}) class InventoryRead(DetailView): model = Inventory template_name = "kfet/inventory_read.html" context_object_name = "inventory" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) output_field = DecimalField(max_digits=10, decimal_places=2, default=0) inventory_articles = ( InventoryArticle.objects.select_related("article", "article__category") .filter(inventory=self.object) .annotate( amount_error=ExpressionWrapper( F("stock_error") * F("article__price"), output_field=output_field ) ) .order_by("article__category__name", "article__name") ) context["inventoryarts"] = inventory_articles stats = inventory_articles.aggregate( new=ExpressionWrapper( Sum(F("stock_new") * F("article__price")), output_field=output_field ), error=Sum("amount_error"), old=ExpressionWrapper( Sum(F("stock_old") * F("article__price")), output_field=output_field ), ) context.update( { "total_amount_old": stats["old"], "total_amount_new": stats["new"], "total_amount_error": stats["error"], } ) return context class InventoryDelete(PermissionRequiredMixin, DeleteView): model = Inventory success_url = reverse_lazy("kfet.inventory") success_message = "Inventaire annulé avec succès !" permission_required = "kfet.delete_inventory" def get(self, request, *args, **kwargs): return redirect("kfet.inventory.read", self.kwargs.get(self.pk_url_kwarg)) def delete(self, request, *args, **kwargs): inv = self.get_object() # On met à jour les articles dont c'est le dernier inventaire # .get() ne marche pas avec OuterRef, donc on utilise .filter() avec [:1] update_subquery = InventoryArticle.objects.filter( inventory=inv, article=OuterRef("pk") ).values("stock_old")[:1] Article.objects.annotate(last_env=Max("inventories__at")).filter( last_env=inv.at ).update(stock=Subquery(update_subquery)) # On a tout mis à jour, on peut delete (avec un message) messages.success(request, self.success_message) return super().delete(request, *args, **kwargs) # ----- # Order views # ----- class OrderList(ListView): queryset = Order.objects.select_related("supplier", "inventory") template_name = "kfet/order.html" context_object_name = "orders" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["suppliers"] = Supplier.objects.order_by("name") return context @teamkfet_required @kfet_password_auth def order_create(request, pk): supplier = get_object_or_404(Supplier, pk=pk) articles = ( Article.objects.filter(suppliers=supplier.pk) .distinct() .select_related("category") .order_by("-is_sold", "category__name", "name") ) # Force hit to cache articles = list(articles) sales_q = ( Operation.objects.select_related("group") .filter(article__in=articles, canceled_at=None) .values("article") .annotate(nb=Sum("article_nb")) ) scale = WeekScale(last=True, n_steps=5, std_chunk=False) chunks = scale.chunkify_qs(sales_q, field="group__at") sales = [{d["article"]: d["nb"] for d in chunk} for chunk in chunks] initial = [] for article in articles: # Get sales for each 5 last weeks v_all = [chunk.get(article.pk, 0) for chunk in sales] # Take the 3 greatest (eg to avoid 2 weeks of vacations) v_3max = heapq.nlargest(3, v_all) # Get average and standard deviation v_moy = statistics.mean(v_3max) v_et = statistics.pstdev(v_3max, v_moy) # Expected sales for next week v_prev = v_moy + v_et # We want to have 1.5 * the expected sales in stock # (because sometimes some articles are not delivered) c_rec_tot = max(v_prev * 1.5 - article.stock, 0) # If ordered quantity is close enough to a level which can led to free # boxes, we increase it to this level. if article.box_capacity: c_rec_temp = c_rec_tot / article.box_capacity if c_rec_temp >= 10: c_rec = round(c_rec_temp) elif c_rec_temp > 5: c_rec = 10 elif c_rec_temp > 2: c_rec = 5 else: c_rec = round(c_rec_temp) initial.append( { "article": article.pk, "name": article.name, "category": article.category_id, "category__name": article.category.name, "stock": article.stock, "box_capacity": article.box_capacity, "v_all": v_all, "v_moy": round(v_moy), "v_et": round(v_et), "v_prev": round(v_prev), "c_rec": article.box_capacity and c_rec or round(c_rec_tot), "is_sold": article.is_sold, } ) cls_formset = formset_factory(form=OrderArticleForm, extra=0) if request.POST: formset = cls_formset(request.POST, initial=initial) if not request.user.has_perm("kfet.add_order"): messages.error( request, "Permission refusée", extra_tags="permission-denied" ) elif formset.is_valid(): order = Order() order.supplier = supplier saved = False for form in formset: if form.cleaned_data["quantity_ordered"] is not None: if not saved: order.save() saved = True article = form.cleaned_data["article"] q_ordered = form.cleaned_data["quantity_ordered"] if article.box_capacity: q_ordered *= article.box_capacity OrderArticle.objects.create( order=order, article=article, quantity_ordered=q_ordered ) if saved: messages.success(request, "Commande créée") return redirect("kfet.order.read", order.pk) messages.warning(request, "Rien commandé => Pas de commande") else: messages.error(request, "Corrigez les erreurs") else: formset = cls_formset(initial=initial) scale.label_fmt = "S-{rev_i}" return render( request, "kfet/order_create.html", {"supplier": supplier, "formset": formset, "scale": scale}, ) class OrderRead(DetailView): model = Order template_name = "kfet/order_read.html" context_object_name = "order" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) orderarticles = ( OrderArticle.objects.select_related("article", "article__category") .filter(order=self.object) .order_by("article__category__name", "article__name") ) context["orderarts"] = orderarticles mail = ( "Bonjour,\n\nNous voudrions pour le ##DATE## à la K-Fêt de " "l'ENS Ulm :" ) category = 0 for orderarticle in orderarticles: if category != orderarticle.article.category: category = orderarticle.article.category mail += "\n" nb = orderarticle.quantity_ordered box = "" if orderarticle.article.box_capacity: nb /= orderarticle.article.box_capacity if nb >= 2: box = " %ss de" % orderarticle.article.box_type else: box = " %s de" % orderarticle.article.box_type name = orderarticle.article.name.capitalize() mail += "\n- %s%s %s" % (round(nb), box, name) mail += ( "\n\nMerci d'appeler le numéro suivant lorsque les livreurs " "sont là : ##TELEPHONE##\nCordialement,\n##PRENOM## ##NOM## " ", pour la K-Fêt de l'ENS Ulm" ) context["mail"] = mail return context @teamkfet_required @kfet_password_auth def order_to_inventory(request, pk): order = get_object_or_404(Order, pk=pk) if hasattr(order, "inventory"): raise Http404 supplier_prefetch = Prefetch( "article__supplierarticle_set", queryset=( SupplierArticle.objects.filter(supplier=order.supplier).order_by("-at") ), to_attr="supplier", ) order_articles = ( OrderArticle.objects.filter(order=order.pk) .select_related("article", "article__category") .prefetch_related(supplier_prefetch) .order_by("article__category__name", "article__name") ) initial = [] for order_article in order_articles: article = order_article.article initial.append( { "article": article.pk, "name": article.name, "category": article.category_id, "category__name": article.category.name, "quantity_ordered": order_article.quantity_ordered, "quantity_received": order_article.quantity_ordered, "price_HT": article.supplier[0].price_HT, "TVA": article.supplier[0].TVA, "rights": article.supplier[0].rights, } ) cls_formset = formset_factory(OrderArticleToInventoryForm, extra=0) if request.method == "POST": formset = cls_formset(request.POST, initial=initial) if not request.user.has_perm("kfet.order_to_inventory"): messages.error( request, "Permission refusée", extra_tags="permission-denied" ) elif formset.is_valid(): with transaction.atomic(): inventory = Inventory.objects.create( order=order, by=request.user.profile.account_kfet ) new_supplierarticle = [] new_inventoryarticle = [] for form in formset: q_received = form.cleaned_data["quantity_received"] article = form.cleaned_data["article"] price_HT = form.cleaned_data["price_HT"] TVA = form.cleaned_data["TVA"] rights = form.cleaned_data["rights"] if any( ( form.initial["price_HT"] != price_HT, form.initial["TVA"] != TVA, form.initial["rights"] != rights, ) ): new_supplierarticle.append( SupplierArticle( supplier=order.supplier, article=article, price_HT=price_HT, TVA=TVA, rights=rights, ) ) ( OrderArticle.objects.filter( order=order, article=article ).update(quantity_received=q_received) ) new_inventoryarticle.append( InventoryArticle( inventory=inventory, article=article, stock_old=article.stock, stock_new=article.stock + q_received, ) ) article.stock += q_received if q_received > 0: article.is_sold = True article.save() SupplierArticle.objects.bulk_create(new_supplierarticle) InventoryArticle.objects.bulk_create(new_inventoryarticle) messages.success(request, "C'est tout bon !") return redirect("kfet.order") else: messages.error(request, "Corrigez les erreurs") else: formset = cls_formset(initial=initial) return render( request, "kfet/order_to_inventory.html", {"formset": formset, "order": order} ) @method_decorator(kfet_password_auth, name="dispatch") class SupplierUpdate(SuccessMessageMixin, UpdateView): model = Supplier template_name = "kfet/supplier_form.html" fields = ["name", "address", "email", "phone", "comment"] success_url = reverse_lazy("kfet.order") sucess_message = "Données fournisseur mis à jour" # Surcharge de la validation def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.change_supplier"): form.add_error( None, ValidationError("Permission refusée", code="permission-denied") ) return self.form_invalid(form) # Updating return super().form_valid(form) # ========== # Statistics # ========== # --------------- # Vues génériques # --------------- # source : docs.djangoproject.com/fr/1.10/topics/class-based-views/mixins/ class JSONResponseMixin: """ A mixin that can be used to render a JSON response. """ def render_to_json_response(self, context, **response_kwargs): """ Returns a JSON response, transforming 'context' to make the payload. """ return JsonResponse(self.get_data(context), **response_kwargs) def get_data(self, context): """ Returns an object that will be serialized as JSON by json.dumps(). """ # Note: This is *EXTREMELY* naive; in reality, you'll need # to do much more complex handling to ensure that arbitrary # objects -- such as Django model instances or querysets # -- can be serialized as JSON. return context class JSONDetailView(JSONResponseMixin, BaseDetailView): """Returns a DetailView that renders a JSON.""" def render_to_response(self, context): return self.render_to_json_response(context) class SingleResumeStat(JSONDetailView): """ Génère l'interface de sélection pour les statistiques d'un compte/article. L'interface est constituée d'une série de boutons, qui récupèrent et graphent des statistiques du même type, sur le même objet mais avec des arguments différents. Attributs : - url_stat : URL où récupérer les statistiques - stats : liste de dictionnaires avec les clés suivantes : - label : texte du bouton - url_params : paramètres GET à rajouter à `url_stat` - default : si `True`, graphe à montrer par défaut On peut aussi définir `stats` dynamiquement, via la fonction `get_stats`. """ url_stat = None stats = [] def get_stats(self): return self.stats def get_context_data(self, **kwargs): # On n'hérite pas context = {} stats = [] # On peut avoir récupéré self.object via pk ou slug if self.pk_url_kwarg in self.kwargs: url_pk = getattr(self.object, self.pk_url_kwarg) else: url_pk = getattr(self.object, self.slug_url_kwarg) for stat_def in self.get_stats(): url_params_d = stat_def.get("url_params", {}) if len(url_params_d) > 0: url_params = "?{}".format(urlencode(url_params_d)) else: url_params = "" stats.append( { "label": stat_def["label"], "url": "{url}{params}".format( url=reverse(self.url_stat, args=[url_pk]), params=url_params ), "default": stat_def.get("default", False), } ) context["stats"] = stats return context class UserAccountMixin: """ Mixin qui vérifie que le compte traité par la vue est celui de l'utilisateur·ice actuel·le. Dans le cas contraire, renvoie un Http404. """ def get_object(self, *args, **kwargs): obj = super().get_object(*args, **kwargs) if self.request.user != obj.user: raise Http404 return obj class ScaleMixin(object): """Mixin pour utiliser les outils de `kfet.statistic`.""" def get_context_data(self, *args, **kwargs): # On n'hérite pas form = StatScaleForm(self.request.GET, prefix="scale") if not form.is_valid(): raise SuspiciousOperation( "Invalid StatScaleForm. Did someone tamper with the GET parameters ?" ) scale_name = form.cleaned_data.pop("name") scale_cls = SCALE_DICT.get(scale_name) self.scale = scale_cls(**form.cleaned_data) return {"labels": self.scale.get_labels()} # ----------------------- # Evolution Balance perso # ----------------------- @method_decorator(login_required, name="dispatch") class AccountStatBalanceList(UserAccountMixin, SingleResumeStat): """ Menu général pour l'historique de balance d'un compte """ model = Account slug_url_kwarg = "trigramme" slug_field = "trigramme" url_stat = "kfet.account.stat.balance" stats = [ {"label": "Tout le temps"}, {"label": "1 an", "url_params": {"last_days": 365}}, {"label": "6 mois", "url_params": {"last_days": 183}}, {"label": "3 mois", "url_params": {"last_days": 90}, "default": True}, {"label": "30 jours", "url_params": {"last_days": 30}}, ] @method_decorator(login_required, name="dispatch") class AccountStatBalance(UserAccountMixin, JSONDetailView): """ Statistiques (JSON) d'historique de balance d'un compte. Prend en compte les opérations et transferts sur la période donnée. """ model = Account slug_url_kwarg = "trigramme" slug_field = "trigramme" def get_changes_list(self, last_days=None, begin_date=None, end_date=None): account = self.object # prepare filters if last_days is not None: end_date = timezone.now() begin_date = end_date - timezone.timedelta(days=last_days) # prepare querysets # TODO: retirer les opgroup dont tous les op sont annulées opegroups = OperationGroup.objects.filter(on_acc=account) transfers = Transfer.objects.filter(canceled_at=None).select_related("group") recv_transfers = transfers.filter(to_acc=account) sent_transfers = transfers.filter(from_acc=account) # apply filters if begin_date is not None: opegroups = opegroups.filter(at__gte=begin_date) recv_transfers = recv_transfers.filter(group__at__gte=begin_date) sent_transfers = sent_transfers.filter(group__at__gte=begin_date) if end_date is not None: opegroups = opegroups.filter(at__lte=end_date) recv_transfers = recv_transfers.filter(group__at__lte=end_date) sent_transfers = sent_transfers.filter(group__at__lte=end_date) # On transforme tout ça en une liste de dictionnaires sous la forme # {'at': date, # 'amount': changement de la balance (négatif si diminue la balance, # positif si l'augmente), # 'label': text descriptif, # 'balance': état de la balance après l'action (0 pour le moment, # sera mis à jour lors d'une # autre passe) # } actions = [] actions.append( { "at": (begin_date or account.created_at).isoformat(), "amount": 0, "balance": 0, } ) actions.append( {"at": (end_date or timezone.now()).isoformat(), "amount": 0, "balance": 0} ) actions += ( [ {"at": ope_grp.at.isoformat(), "amount": ope_grp.amount, "balance": 0} for ope_grp in opegroups ] + [ {"at": tr.group.at.isoformat(), "amount": tr.amount, "balance": 0} for tr in recv_transfers ] + [ {"at": tr.group.at.isoformat(), "amount": -tr.amount, "balance": 0} for tr in sent_transfers ] ) # Maintenant on trie la liste des actions par ordre du plus récent # an plus ancien et on met à jour la balance if len(actions) > 1: actions = sorted(actions, key=lambda k: k["at"], reverse=True) actions[0]["balance"] = account.balance for i in range(len(actions) - 1): actions[i + 1]["balance"] = ( actions[i]["balance"] - actions[i + 1]["amount"] ) return actions def get_context_data(self, *args, **kwargs): context = {} form = AccountStatForm(self.request.GET) if not form.is_valid(): raise SuspiciousOperation( "Invalid AccountStatForm. Did someone tamper with the GET parameters ?" ) changes = self.get_changes_list(**form.cleaned_data) context["charts"] = [ {"color": "rgb(200, 20, 60)", "label": "Balance", "values": changes} ] context["is_time_chart"] = True if len(changes) > 0: context["min_date"] = changes[-1]["at"] context["max_date"] = changes[0]["at"] # TODO: offset return context # ------------------------ # Consommation personnelle # ------------------------ @method_decorator(login_required, name="dispatch") class AccountStatOperationList(UserAccountMixin, SingleResumeStat): """ Menu général pour l'historique de consommation d'un compte """ model = Account slug_url_kwarg = "trigramme" slug_field = "trigramme" url_stat = "kfet.account.stat.operation" def get_stats(self): scales_def = [ ( "Tout le temps", MonthScale, {"last": True, "begin": self.object.created_at.replace(tzinfo=None)}, False, ), ("1 an", MonthScale, {"last": True, "n_steps": 12}, False), ("3 mois", WeekScale, {"last": True, "n_steps": 13}, True), ("2 semaines", DayScale, {"last": True, "n_steps": 14}, False), ] return scale_url_params(scales_def) @method_decorator(login_required, name="dispatch") class AccountStatOperation(UserAccountMixin, ScaleMixin, JSONDetailView): """ Statistiques (JSON) de consommation (nb d'items achetés) d'un compte. """ model = Account slug_url_kwarg = "trigramme" slug_field = "trigramme" def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) operations = ( Operation.objects.filter( type=Operation.PURCHASE, group__on_acc=self.object, canceled_at=None ) .values("article_nb", "group__at") .order_by("group__at") ) # On compte les opérations nb_ventes = self.scale.chunkify_qs( operations, field="group__at", aggregate=Sum("article_nb") ) context["charts"] = [ { "color": "rgb(200, 20, 60)", "label": "NB items achetés", "values": nb_ventes, } ] return context # ------------------------ # Article Statistiques Last # ------------------------ @method_decorator(teamkfet_required, name="dispatch") class ArticleStatSalesList(SingleResumeStat): """ Menu pour les statistiques de vente d'un article. """ model = Article nb_default = 2 url_stat = "kfet.article.stat.sales" def get_stats(self): first_conso = ( Operation.objects.filter(article=self.object) .order_by("group__at") .values_list("group__at", flat=True) .first() ) if first_conso is None: # On le crée dans le passé au cas où first_conso = timezone.now() - timedelta(seconds=1) scales_def = [ ( "Tout le temps", MonthScale, {"last": True, "begin": first_conso.strftime("%Y-%m-%d %H:%M:%S")}, False, ), ("1 an", MonthScale, {"last": True, "n_steps": 12}, False), ("3 mois", WeekScale, {"last": True, "n_steps": 13}, True), ("2 semaines", DayScale, {"last": True, "n_steps": 14}, False), ] return scale_url_params(scales_def) @method_decorator(teamkfet_required, name="dispatch") class ArticleStatSales(ScaleMixin, JSONDetailView): """ Statistiques (JSON) de vente d'un article. Sépare LIQ et les comptes K-Fêt, et rajoute le total. """ model = Article context_object_name = "article" def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) scale = self.scale all_purchases = ( Operation.objects.filter( type=Operation.PURCHASE, article=self.object, canceled_at=None ) .values("group__at", "article_nb") .order_by("group__at") ) cof_accts = all_purchases.filter(group__on_acc__cofprofile__is_cof=True) noncof_accts = all_purchases.exclude(group__on_acc__cofprofile__is_cof=True) nb_cof = scale.chunkify_qs( cof_accts, field="group__at", aggregate=Sum("article_nb") ) nb_noncof = scale.chunkify_qs( noncof_accts, field="group__at", aggregate=Sum("article_nb") ) nb_ventes = [n1 + n2 for n1, n2 in zip(nb_cof, nb_noncof)] context["charts"] = [ { "color": "rgb(200, 20, 60)", "label": "Toutes consommations", "values": nb_ventes, }, {"color": "rgb(54, 162, 235)", "label": "Comptes K-Fêt", "values": nb_cof}, { "color": "rgb(255, 205, 86)", "label": "LIQ", "values": nb_noncof, }, ] return context # --- # Autocompletion views # --- class AccountCreateAutocompleteView(PermissionRequiredMixin, AutocompleteView): template_name = "kfet/search_results.html" permission_required = "kfet.is_team" search_composer = kfet_autocomplete class AccountSearchAutocompleteView(PermissionRequiredMixin, AutocompleteView): permission_required = "kfet.is_team" search_composer = kfet_account_only_autocomplete