import ast import heapq import statistics from collections import defaultdict from decimal import Decimal from typing import List from urllib.parse import urlencode from django.contrib import messages from django.contrib.auth.decorators import login_required, permission_required from django.contrib.auth.mixins import PermissionRequiredMixin from django.contrib.auth.models import Permission, User from django.contrib.messages.views import SuccessMessageMixin from django.db import transaction from django.db.models import Count, F, Prefetch, Q, Sum from django.forms import formset_factory from django.http import Http404, JsonResponse from django.shortcuts import get_object_or_404, redirect, render from django.urls import reverse, reverse_lazy from django.utils import timezone from django.utils.decorators import method_decorator from django.views.generic import DetailView, FormView, ListView, TemplateView from django.views.generic.detail import BaseDetailView from django.views.generic.edit import CreateView, DeleteView, UpdateView from gestioncof.models import CofProfile from kfet import KFET_DELETED_TRIGRAMME, consumers from kfet.auth.decorators import kfet_password_auth from kfet.config import kfet_config from kfet.decorators import teamkfet_required from kfet.forms import ( AccountForm, AccountNegativeForm, AccountNoTriForm, AccountPwdForm, AccountRestrictForm, AccountTriForm, AddcostForm, ArticleForm, ArticleRestrictForm, CategoryForm, CheckoutForm, CheckoutRestrictForm, CheckoutStatementCreateForm, CheckoutStatementUpdateForm, CofForm, FilterHistoryForm, InventoryArticleForm, KFetConfigForm, KPsulAccountForm, KPsulCheckoutForm, KPsulOperationFormSet, KPsulOperationGroupForm, OrderArticleForm, OrderArticleToInventoryForm, TransferFormSet, UserForm, UserGroupForm, UserInfoForm, ) from kfet.models import ( Account, AccountNegative, Article, ArticleCategory, Checkout, CheckoutStatement, Inventory, InventoryArticle, Operation, OperationGroup, Order, OrderArticle, Supplier, SupplierArticle, Transfer, TransferGroup, ) from kfet.statistic import ScaleMixin, WeekScale, last_stats_manifest from .auth import KFET_GENERIC_TRIGRAMME from .auth.views import ( # noqa AccountGroupCreate, AccountGroupUpdate, account_group, login_generic, ) def put_cleaned_data_in_dict(dict, form): for field in form.cleaned_data: dict[field] = form.cleaned_data[field] # ----- # Account views # ----- # Account - General @login_required @teamkfet_required def account(request): accounts = Account.objects.select_related("cofprofile__user").order_by("trigramme") return render(request, "kfet/account.html", {"accounts": accounts}) @login_required @teamkfet_required def account_is_validandfree_ajax(request): if not request.GET.get("trigramme", ""): raise Http404 trigramme = request.GET.get("trigramme") data = Account.is_validandfree(trigramme) return JsonResponse(data) # Account - Create @login_required @teamkfet_required @kfet_password_auth def account_create(request): # Enregistrement if request.method == "POST": trigramme_form = AccountTriForm(request.POST) # Peuplement des forms username = request.POST.get("username") login_clipper = request.POST.get("login_clipper") forms = get_account_create_forms( request, username=username, login_clipper=login_clipper ) account_form = forms["account_form"] cof_form = forms["cof_form"] user_form = forms["user_form"] if all( ( user_form.is_valid(), cof_form.is_valid(), trigramme_form.is_valid(), account_form.is_valid(), ) ): # Checking permission if not request.user.has_perm("kfet.add_account"): messages.error(request, "Permission refusée") else: data = {} # Fill data for Account.save() put_cleaned_data_in_dict(data, user_form) put_cleaned_data_in_dict(data, cof_form) try: account = trigramme_form.save(data=data) account_form = AccountNoTriForm(request.POST, instance=account) account_form.save() messages.success(request, "Compte créé : %s" % account.trigramme) return redirect("kfet.account.create") except Account.UserHasAccount as e: messages.error( request, "Cet utilisateur a déjà un compte K-Fêt : %s" % e.trigramme, ) else: initial = {"trigramme": request.GET.get("trigramme", "")} trigramme_form = AccountTriForm(initial=initial) account_form = None cof_form = None user_form = None return render( request, "kfet/account_create.html", { "trigramme_form": trigramme_form, "account_form": account_form, "cof_form": cof_form, "user_form": user_form, }, ) def account_form_set_readonly_fields(user_form, cof_form): user_form.fields["username"].widget.attrs["readonly"] = True cof_form.fields["login_clipper"].widget.attrs["readonly"] = True cof_form.fields["is_cof"].widget.attrs["disabled"] = True def get_account_create_forms( request=None, username=None, login_clipper=None, fullname=None ): user = None clipper = False if login_clipper and (login_clipper == username or not username): # à partir d'un clipper # le user associé à ce clipper ne devrait pas encore exister clipper = True try: # Vérification que clipper ne soit pas déjà dans User user = User.objects.get(username=login_clipper) # Ici, on nous a menti, le user existe déjà username = user.username clipper = False except User.DoesNotExist: # Clipper (sans user déjà existant) # UserForm - Prefill user_initial = { "username": login_clipper, "email": "%s@clipper.ens.fr" % login_clipper, } if fullname: # Prefill du nom et prénom names = fullname.split() # Le premier, c'est le prénom user_initial["first_name"] = names[0] if len(names) > 1: # Si d'autres noms -> tous dans le nom de famille user_initial["last_name"] = " ".join(names[1:]) # CofForm - Prefill cof_initial = {"login_clipper": login_clipper} # Form créations if request: user_form = UserForm(request.POST, initial=user_initial) cof_form = CofForm(request.POST, initial=cof_initial) else: user_form = UserForm(initial=user_initial) cof_form = CofForm(initial=cof_initial) # Protection (read-only) des champs username et login_clipper account_form_set_readonly_fields(user_form, cof_form) if username and not clipper: try: user = User.objects.get(username=username) # le user existe déjà # récupération du profil cof (cof, _) = CofProfile.objects.get_or_create(user=user) # UserForm + CofForm - Création à partir des instances existantes if request: user_form = UserForm(request.POST, instance=user) cof_form = CofForm(request.POST, instance=cof) else: user_form = UserForm(instance=user) cof_form = CofForm(instance=cof) # Protection (read-only) des champs username, login_clipper et is_cof account_form_set_readonly_fields(user_form, cof_form) except User.DoesNotExist: # le username donnée n'existe pas -> Création depuis rien # (éventuellement en cours avec erreurs précédemment) pass if not user and not clipper: # connaît pas du tout, faut tout remplir if request: user_form = UserForm(request.POST) cof_form = CofForm(request.POST) else: user_form = UserForm() cof_form = CofForm() # mais on laisse le username en écriture cof_form.fields["login_clipper"].widget.attrs["readonly"] = True cof_form.fields["is_cof"].widget.attrs["disabled"] = True if request: account_form = AccountNoTriForm(request.POST) else: account_form = AccountNoTriForm() return {"account_form": account_form, "cof_form": cof_form, "user_form": user_form} @login_required @teamkfet_required def account_create_ajax(request, username=None, login_clipper=None, fullname=None): forms = get_account_create_forms( request=None, username=username, login_clipper=login_clipper, fullname=fullname ) return render( request, "kfet/account_create_form.html", { "account_form": forms["account_form"], "cof_form": forms["cof_form"], "user_form": forms["user_form"], }, ) # Account - Read @login_required def account_read(request, trigramme): account = get_object_or_404(Account, trigramme=trigramme) # Checking permissions if not account.readable or ( not request.user.has_perm("kfet.is_team") and request.user != account.user ): raise Http404 addcosts = ( OperationGroup.objects.filter(opes__addcost_for=account, opes__canceled_at=None) .extra({"date": "date(at)"}) .values("date") .annotate(sum_addcosts=Sum("opes__addcost_amount")) .order_by("-date") ) return render( request, "kfet/account_read.html", {"account": account, "addcosts": addcosts} ) # Account - Update @login_required @kfet_password_auth def account_update(request, trigramme): account = get_object_or_404(Account, trigramme=trigramme) # Checking permissions if not account.editable or ( not request.user.has_perm("kfet.is_team") and request.user != account.user ): raise Http404 user_info_form = UserInfoForm(instance=account.user) if request.user.has_perm("kfet.is_team"): group_form = UserGroupForm(instance=account.user) account_form = AccountForm(instance=account) pwd_form = AccountPwdForm() if account.balance < 0 and not hasattr(account, "negative"): AccountNegative.objects.create(account=account, start=timezone.now()) account.refresh_from_db() if hasattr(account, "negative"): negative_form = AccountNegativeForm(instance=account.negative) else: negative_form = None else: account_form = AccountRestrictForm(instance=account) group_form = None negative_form = None pwd_form = None if request.method == "POST": # Update attempt success = False missing_perm = True if request.user.has_perm("kfet.is_team"): account_form = AccountForm(request.POST, instance=account) group_form = UserGroupForm(request.POST, instance=account.user) pwd_form = AccountPwdForm(request.POST) if hasattr(account, "negative"): negative_form = AccountNegativeForm( request.POST, instance=account.negative ) if request.user.has_perm("kfet.change_account") and account_form.is_valid(): missing_perm = False # Updating account_form.save() # Checking perm to update password if ( request.user.has_perm("kfet.change_account_password") and pwd_form.is_valid() ): pwd = pwd_form.cleaned_data["pwd1"] account.change_pwd(pwd) account.save() messages.success(request, "Mot de passe mis à jour") # Checking perm to manage perms if request.user.has_perm("kfet.manage_perms") and group_form.is_valid(): group_form.save() # Checking perm to manage negative if hasattr(account, "negative"): balance_offset_old = 0 if account.negative.balance_offset: balance_offset_old = account.negative.balance_offset if ( hasattr(account, "negative") and request.user.has_perm("kfet.change_accountnegative") and negative_form.is_valid() ): balance_offset_new = negative_form.cleaned_data["balance_offset"] if not balance_offset_new: balance_offset_new = 0 balance_offset_diff = balance_offset_new - balance_offset_old Account.objects.filter(pk=account.pk).update( balance=F("balance") + balance_offset_diff ) negative_form.save() if ( Account.objects.get(pk=account.pk).balance >= 0 and not balance_offset_new ): AccountNegative.objects.get(account=account).delete() success = True messages.success( request, "Informations du compte %s mises à jour" % account.trigramme, ) # Modification de ses propres informations if request.user == account.user: missing_perm = False account.refresh_from_db() account_form = AccountRestrictForm(request.POST, instance=account) pwd_form = AccountPwdForm(request.POST) if account_form.is_valid(): account_form.save() success = True messages.success(request, "Vos informations ont été mises à jour") if request.user.has_perm("kfet.is_team") and pwd_form.is_valid(): pwd = pwd_form.cleaned_data["pwd1"] account.change_pwd(pwd) account.save() messages.success(request, "Votre mot de passe a été mis à jour") if missing_perm: messages.error(request, "Permission refusée") if success: return redirect("kfet.account.read", account.trigramme) else: messages.error( request, "Informations non mises à jour. Corrigez les erreurs" ) return render( request, "kfet/account_update.html", { "user_info_form": user_info_form, "account": account, "account_form": account_form, "group_form": group_form, "negative_form": negative_form, "pwd_form": pwd_form, }, ) # Account - Delete class AccountDelete(PermissionRequiredMixin, DeleteView): model = Account slug_field = "trigramme" slug_url_kwarg = "trigramme" success_url = reverse_lazy("kfet.account") success_message = "Compte supprimé avec succès !" permission_required = "kfet.delete_account" http_method_names = ["post"] def delete(self, request, *args, **kwargs): self.object = self.get_object() if self.object.balance >= 0.01: messages.error( request, "Impossible de supprimer un compte " "avec une balance strictement positive !", ) return redirect("kfet.account.read", self.object.trigramme) if self.object.trigramme in [ "LIQ", KFET_GENERIC_TRIGRAMME, KFET_DELETED_TRIGRAMME, "#13", ]: messages.error(request, "Impossible de supprimer un trigramme protégé !") return redirect("kfet.account.read", self.object.trigramme) # SuccessMessageMixin does not work with DeleteView, see : # https://code.djangoproject.com/ticket/21926 messages.success(request, self.success_message) return super().delete(request, *args, **kwargs) class AccountNegativeList(ListView): queryset = AccountNegative.objects.select_related( "account", "account__cofprofile__user" ).exclude(account__trigramme="#13") template_name = "kfet/account_negative.html" context_object_name = "negatives" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) real_balances = (neg.account.real_balance for neg in self.object_list) context["negatives_sum"] = sum(real_balances) return context # ----- # Checkout views # ----- # Checkout - General class CheckoutList(ListView): model = Checkout template_name = "kfet/checkout.html" context_object_name = "checkouts" # Checkout - Create @method_decorator(kfet_password_auth, name="dispatch") class CheckoutCreate(SuccessMessageMixin, CreateView): model = Checkout template_name = "kfet/checkout_create.html" form_class = CheckoutForm success_message = "Nouvelle caisse : %(name)s" # Surcharge de la validation def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.add_checkout"): form.add_error(None, "Permission refusée") return self.form_invalid(form) # Creating form.instance.created_by = self.request.user.profile.account_kfet form.save() return super().form_valid(form) # Checkout - Read class CheckoutRead(DetailView): model = Checkout template_name = "kfet/checkout_read.html" context_object_name = "checkout" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["statements"] = context["checkout"].statements.order_by("-at") return context # Checkout - Update @method_decorator(kfet_password_auth, name="dispatch") class CheckoutUpdate(SuccessMessageMixin, UpdateView): model = Checkout template_name = "kfet/checkout_update.html" form_class = CheckoutRestrictForm success_message = "Informations mises à jour pour la caisse : %(name)s" # Surcharge de la validation def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.change_checkout"): form.add_error(None, "Permission refusée") return self.form_invalid(form) # Updating return super().form_valid(form) # ----- # Checkout Statement views # ----- # Checkout Statement - General class CheckoutStatementList(ListView): model = CheckoutStatement queryset = CheckoutStatement.objects.order_by("-at") template_name = "kfet/checkoutstatement.html" context_object_name = "checkoutstatements" # Checkout Statement - Create def getAmountTaken(data): return Decimal( data.taken_001 * 0.01 + data.taken_002 * 0.02 + data.taken_005 * 0.05 + data.taken_01 * 0.1 + data.taken_02 * 0.2 + data.taken_05 * 0.5 + data.taken_1 * 1 + data.taken_2 * 2 + data.taken_5 * 5 + data.taken_10 * 10 + data.taken_20 * 20 + data.taken_50 * 50 + data.taken_100 * 100 + data.taken_200 * 200 + data.taken_500 * 500 + float(data.taken_cheque) ) def getAmountBalance(data): return Decimal( data["balance_001"] * 0.01 + data["balance_002"] * 0.02 + data["balance_005"] * 0.05 + data["balance_01"] * 0.1 + data["balance_02"] * 0.2 + data["balance_05"] * 0.5 + data["balance_1"] * 1 + data["balance_2"] * 2 + data["balance_5"] * 5 + data["balance_10"] * 10 + data["balance_20"] * 20 + data["balance_50"] * 50 + data["balance_100"] * 100 + data["balance_200"] * 200 + data["balance_500"] * 500 ) @method_decorator(kfet_password_auth, name="dispatch") class CheckoutStatementCreate(SuccessMessageMixin, CreateView): model = CheckoutStatement template_name = "kfet/checkoutstatement_create.html" form_class = CheckoutStatementCreateForm success_message = "Nouveau relevé : %(checkout)s - %(at)s" def get_success_url(self): return reverse_lazy( "kfet.checkout.read", kwargs={"pk": self.kwargs["pk_checkout"]} ) def get_success_message(self, cleaned_data): return self.success_message % dict( cleaned_data, checkout=self.object.checkout.name, at=self.object.at ) def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) checkout = Checkout.objects.get(pk=self.kwargs["pk_checkout"]) context["checkout"] = checkout return context def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.add_checkoutstatement"): form.add_error(None, "Permission refusée") return self.form_invalid(form) # Creating form.instance.amount_taken = getAmountTaken(form.instance) if not form.instance.not_count: form.instance.balance_new = getAmountBalance(form.cleaned_data) form.instance.checkout_id = self.kwargs["pk_checkout"] form.instance.by = self.request.user.profile.account_kfet return super().form_valid(form) @method_decorator(kfet_password_auth, name="dispatch") class CheckoutStatementUpdate(SuccessMessageMixin, UpdateView): model = CheckoutStatement template_name = "kfet/checkoutstatement_update.html" form_class = CheckoutStatementUpdateForm success_message = "Relevé modifié" def get_success_url(self): return reverse_lazy( "kfet.checkout.read", kwargs={"pk": self.kwargs["pk_checkout"]} ) def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) checkout = Checkout.objects.get(pk=self.kwargs["pk_checkout"]) context["checkout"] = checkout return context def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.change_checkoutstatement"): form.add_error(None, "Permission refusée") return self.form_invalid(form) # Updating form.instance.amount_taken = getAmountTaken(form.instance) return super().form_valid(form) # ----- # Category views # ----- # Category - General class CategoryList(ListView): queryset = ArticleCategory.objects.prefetch_related("articles").order_by("name") template_name = "kfet/category.html" context_object_name = "categories" # Category - Update @method_decorator(kfet_password_auth, name="dispatch") class CategoryUpdate(SuccessMessageMixin, UpdateView): model = ArticleCategory template_name = "kfet/category_update.html" form_class = CategoryForm success_url = reverse_lazy("kfet.category") success_message = "Informations mises à jour pour la catégorie : %(name)s" # Surcharge de la validation def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.change_articlecategory"): form.add_error(None, "Permission refusée") return self.form_invalid(form) # Updating return super().form_valid(form) # ----- # Article views # ----- # Article - General class ArticleList(ListView): queryset = ( Article.objects.select_related("category") .prefetch_related( Prefetch( "inventories", queryset=Inventory.objects.order_by("-at"), to_attr="inventory", ) ) .order_by("category__name", "-is_sold", "name") ) template_name = "kfet/article.html" context_object_name = "articles" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) articles = context[self.context_object_name] context["nb_articles"] = len(articles) context[self.context_object_name] = articles.filter(is_sold=True) context["not_sold_articles"] = articles.filter(is_sold=False) return context # Article - Create @method_decorator(kfet_password_auth, name="dispatch") class ArticleCreate(SuccessMessageMixin, CreateView): model = Article template_name = "kfet/article_create.html" form_class = ArticleForm success_message = "Nouvel item : %(category)s - %(name)s" # Surcharge de la validation def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.add_article"): form.add_error(None, "Permission refusée") return self.form_invalid(form) # Save ici pour save le manytomany suppliers article = form.save() # Save des suppliers déjà existant for supplier in form.cleaned_data["suppliers"]: SupplierArticle.objects.create(article=article, supplier=supplier) # Nouveau supplier supplier_new = form.cleaned_data["supplier_new"].strip() if supplier_new: supplier, created = Supplier.objects.get_or_create(name=supplier_new) if created: SupplierArticle.objects.create(article=article, supplier=supplier) # Inventaire avec stock initial inventory = Inventory() inventory.by = self.request.user.profile.account_kfet inventory.save() InventoryArticle.objects.create( inventory=inventory, article=article, stock_old=article.stock, stock_new=article.stock, ) # Creating return super().form_valid(form) # Article - Read class ArticleRead(DetailView): model = Article template_name = "kfet/article_read.html" context_object_name = "article" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) inventoryarts = ( InventoryArticle.objects.filter(article=self.object) .select_related("inventory") .order_by("-inventory__at") ) context["inventoryarts"] = inventoryarts supplierarts = ( SupplierArticle.objects.filter(article=self.object) .select_related("supplier") .order_by("-at") ) context["supplierarts"] = supplierarts return context # Article - Update @method_decorator(kfet_password_auth, name="dispatch") class ArticleUpdate(SuccessMessageMixin, UpdateView): model = Article template_name = "kfet/article_update.html" form_class = ArticleRestrictForm success_message = "Informations mises à jour pour l'article : %(name)s" # Surcharge de la validation def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.change_article"): form.add_error(None, "Permission refusée") return self.form_invalid(form) # Save ici pour save le manytomany suppliers article = form.save() # Save des suppliers déjà existant for supplier in form.cleaned_data["suppliers"]: if supplier not in article.suppliers.all(): SupplierArticle.objects.create(article=article, supplier=supplier) # On vire les suppliers désélectionnés for supplier in article.suppliers.all(): if supplier not in form.cleaned_data["suppliers"]: SupplierArticle.objects.filter( article=article, supplier=supplier ).delete() # Nouveau supplier supplier_new = form.cleaned_data["supplier_new"].strip() if supplier_new: supplier, created = Supplier.objects.get_or_create(name=supplier_new) if created: SupplierArticle.objects.create(article=article, supplier=supplier) # Updating return super().form_valid(form) class ArticleDelete(PermissionRequiredMixin, DeleteView): model = Article success_url = reverse_lazy("kfet.article") success_message = "Article supprimé avec succès !" permission_required = "kfet.delete_article" def get(self, request, *args, **kwargs): return redirect("kfet.article.read", self.kwargs.get(self.pk_url_kwarg)) def delete(self, request, *args, **kwargs): messages.success(request, self.success_message) return super().delete(request, *args, **kwargs) # ----- # K-Psul # ----- @teamkfet_required def kpsul(request): data = {} data["operationgroup_form"] = KPsulOperationGroupForm() data["trigramme_form"] = KPsulAccountForm() data["checkout_form"] = KPsulCheckoutForm() data["operation_formset"] = KPsulOperationFormSet(queryset=Operation.objects.none()) return render(request, "kfet/kpsul.html", data) @teamkfet_required def kpsul_get_settings(request): addcost_for = kfet_config.addcost_for data = { "subvention_cof": kfet_config.subvention_cof, "addcost_for": addcost_for and addcost_for.trigramme or "", "addcost_amount": kfet_config.addcost_amount, } return JsonResponse(data) @teamkfet_required def account_read_json(request, trigramme): account = get_object_or_404(Account, trigramme=trigramme) if not account.readable: raise Http404 data = { "id": account.pk, "name": account.name, "email": account.email, "is_cof": account.is_cof, "promo": account.promo, "balance": account.balance, "is_frozen": account.is_frozen, "departement": account.departement, "nickname": account.nickname, "trigramme": account.trigramme, } return JsonResponse(data) @teamkfet_required def kpsul_checkout_data(request): pk = request.POST.get("pk", 0) if not pk: pk = 0 data = ( Checkout.objects.annotate( last_statement_by_first_name=F( "statements__by__cofprofile__user__first_name" ), last_statement_by_last_name=F( "statements__by__cofprofile__user__last_name" ), last_statement_by_trigramme=F("statements__by__trigramme"), last_statement_balance=F("statements__balance_new"), last_statement_at=F("statements__at"), ) .select_related( "statements" "statements__by", "statements__by__cofprofile__user" ) .filter(pk=pk) .order_by("statements__at") .values( "id", "name", "balance", "valid_from", "valid_to", "last_statement_balance", "last_statement_at", "last_statement_by_trigramme", "last_statement_by_last_name", "last_statement_by_first_name", ) .last() ) if data is None: raise Http404 return JsonResponse(data) @teamkfet_required @kfet_password_auth def kpsul_update_addcost(request): addcost_form = AddcostForm(request.POST) if not addcost_form.is_valid(): data = {"errors": {"addcost": list(addcost_form.errors)}} return JsonResponse(data, status=400) required_perms = ["kfet.manage_addcosts"] if not request.user.has_perms(required_perms): data = { "errors": {"missing_perms": get_missing_perms(required_perms, request.user)} } return JsonResponse(data, status=403) trigramme = addcost_form.cleaned_data["trigramme"] account = trigramme and Account.objects.get(trigramme=trigramme) or None amount = addcost_form.cleaned_data["amount"] kfet_config.set(addcost_for=account, addcost_amount=amount) data = {"addcost": {"for": account and account.trigramme or None, "amount": amount}} consumers.KPsul.group_send("kfet.kpsul", data) return JsonResponse(data) def get_missing_perms(required_perms: List[str], user: User) -> List[str]: def get_perm_description(app_label: str, codename: str) -> str: name = Permission.objects.values_list("name", flat=True).get( codename=codename, content_type__app_label=app_label ) return "[{}] {}".format(app_label, name) missing_perms = [ get_perm_description(*perm.split(".")) for perm in required_perms if not user.has_perm(perm) ] return missing_perms @teamkfet_required @kfet_password_auth def kpsul_perform_operations(request): # Initializing response data data = {"operationgroup": 0, "operations": [], "warnings": {}, "errors": {}} # Checking operationgroup operationgroup_form = KPsulOperationGroupForm(request.POST) if not operationgroup_form.is_valid(): data["errors"]["operation_group"] = list(operationgroup_form.errors) # Checking operation_formset operation_formset = KPsulOperationFormSet(request.POST) if not operation_formset.is_valid(): data["errors"]["operations"] = list(operation_formset.errors) # Returning BAD REQUEST if errors if data["errors"]: return JsonResponse(data, status=400) # Pre-saving (no commit) operationgroup = operationgroup_form.save(commit=False) operations = operation_formset.save(commit=False) # Retrieving COF grant cof_grant = kfet_config.subvention_cof # Retrieving addcosts data addcost_amount = kfet_config.addcost_amount addcost_for = kfet_config.addcost_for # Initializing vars required_perms = set() # Required perms to perform all operations cof_grant_divisor = 1 + cof_grant / 100 to_addcost_for_balance = 0 # For balance of addcost_for to_checkout_balance = 0 # For balance of selected checkout to_articles_stocks = defaultdict(lambda: 0) # For stocks articles is_addcost = all( (addcost_for, addcost_amount, addcost_for != operationgroup.on_acc) ) need_comment = operationgroup.on_acc.need_comment # Filling data of each operations # + operationgroup + calculating other stuffs for operation in operations: if operation.type == Operation.PURCHASE: operation.amount = -operation.article.price * operation.article_nb if is_addcost & operation.article.category.has_addcost: operation.addcost_for = addcost_for operation.addcost_amount = addcost_amount * operation.article_nb operation.amount -= operation.addcost_amount to_addcost_for_balance += operation.addcost_amount if operationgroup.on_acc.is_cash: to_checkout_balance += -operation.amount if ( operationgroup.on_acc.is_cof and operation.article.category.has_reduction ): if is_addcost and operation.article.category.has_addcost: operation.addcost_amount /= cof_grant_divisor operation.amount = operation.amount / cof_grant_divisor to_articles_stocks[operation.article] -= operation.article_nb else: if operationgroup.on_acc.is_cash: data["errors"]["account"] = "LIQ" if operation.type != Operation.EDIT: to_checkout_balance += operation.amount operationgroup.amount += operation.amount if operation.type == Operation.DEPOSIT: required_perms.add("kfet.perform_deposit") if operation.type == Operation.EDIT: required_perms.add("kfet.edit_balance_account") need_comment = True if operationgroup.on_acc.is_cof: to_addcost_for_balance = to_addcost_for_balance / cof_grant_divisor (perms, stop) = operationgroup.on_acc.perms_to_perform_operation( amount=operationgroup.amount ) required_perms |= perms if need_comment: operationgroup.comment = operationgroup.comment.strip() if not operationgroup.comment: data["errors"]["need_comment"] = True if data["errors"]: return JsonResponse(data, status=400) if stop or not request.user.has_perms(required_perms): missing_perms = get_missing_perms(required_perms, request.user) if missing_perms: data["errors"]["missing_perms"] = missing_perms if stop: data["errors"]["negative"] = [operationgroup.on_acc.trigramme] return JsonResponse(data, status=403) # If 1 perm is required, filling who perform the operations if required_perms: operationgroup.valid_by = request.user.profile.account_kfet # Filling cof status for statistics operationgroup.is_cof = operationgroup.on_acc.is_cof # Starting transaction to ensure data consistency with transaction.atomic(): # If not cash account, # saving account's balance and adding to Negative if not in on_acc = operationgroup.on_acc if not on_acc.is_cash: ( Account.objects.filter(pk=on_acc.pk).update( balance=F("balance") + operationgroup.amount ) ) on_acc.refresh_from_db() on_acc.update_negative() # Updating checkout's balance if to_checkout_balance: Checkout.objects.filter(pk=operationgroup.checkout.pk).update( balance=F("balance") + to_checkout_balance ) # Saving addcost_for with new balance if there is one if is_addcost and to_addcost_for_balance: Account.objects.filter(pk=addcost_for.pk).update( balance=F("balance") + to_addcost_for_balance ) # Saving operation group operationgroup.save() data["operationgroup"] = operationgroup.pk # Filling operationgroup id for each operations and saving for operation in operations: operation.group = operationgroup operation.save() data["operations"].append(operation.pk) # Updating articles stock for article in to_articles_stocks: Article.objects.filter(pk=article.pk).update( stock=F("stock") + to_articles_stocks[article] ) # Websocket data websocket_data = {} websocket_data["groups"] = [ { "add": True, "type": "operation", "id": operationgroup.pk, "amount": operationgroup.amount, "checkout__name": operationgroup.checkout.name, "at": operationgroup.at, "is_cof": operationgroup.is_cof, "comment": operationgroup.comment, "valid_by__trigramme": ( operationgroup.valid_by and operationgroup.valid_by.trigramme or None ), "on_acc__trigramme": operationgroup.on_acc.trigramme, "entries": [], } ] for operation in operations: ope_data = { "id": operation.pk, "type": operation.type, "amount": operation.amount, "addcost_amount": operation.addcost_amount, "addcost_for__trigramme": ( operation.addcost_for and addcost_for.trigramme or None ), "article__name": (operation.article and operation.article.name or None), "article_nb": operation.article_nb, "group_id": operationgroup.pk, "canceled_by__trigramme": None, "canceled_at": None, } websocket_data["groups"][0]["entries"].append(ope_data) # Need refresh from db cause we used update on queryset operationgroup.checkout.refresh_from_db() websocket_data["checkouts"] = [ {"id": operationgroup.checkout.pk, "balance": operationgroup.checkout.balance} ] websocket_data["articles"] = [] # Need refresh from db cause we used update on querysets articles_pk = [article.pk for article in to_articles_stocks] articles = Article.objects.values("id", "stock").filter(pk__in=articles_pk) for article in articles: websocket_data["articles"].append( {"id": article["id"], "stock": article["stock"]} ) consumers.KPsul.group_send("kfet.kpsul", websocket_data) return JsonResponse(data) @teamkfet_required @kfet_password_auth def cancel_operations(request): # Pour la réponse data = {"canceled": [], "warnings": {}, "errors": {}} # Checking if BAD REQUEST (opes_pk not int or not existing) try: # Set pour virer les doublons opes_post = set( map(int, filter(None, request.POST.getlist("operations[]", []))) ) except ValueError: return JsonResponse(data, status=400) opes_all = Operation.objects.select_related( "group", "group__on_acc", "group__on_acc__negative" ).filter(pk__in=opes_post) opes_pk = [ope.pk for ope in opes_all] opes_notexisting = [ope for ope in opes_post if ope not in opes_pk] if opes_notexisting: data["errors"]["opes_notexisting"] = opes_notexisting return JsonResponse(data, status=400) opes_already_canceled = [] # Déjà annulée opes = [] # Pas déjà annulée required_perms = set() stop_all = False cancel_duration = kfet_config.cancel_duration to_accounts_balances = defaultdict( lambda: 0 ) # Modifs à faire sur les balances des comptes to_groups_amounts = defaultdict( lambda: 0 ) # ------ sur les montants des groupes d'opé to_checkouts_balances = defaultdict(lambda: 0) # ------ sur les balances de caisses to_articles_stocks = defaultdict(lambda: 0) # ------ sur les stocks d'articles for ope in opes_all: if ope.canceled_at: # Opération déjà annulée, va pour un warning en Response opes_already_canceled.append(ope.pk) else: opes.append(ope.pk) # Si opé il y a plus de CANCEL_DURATION, permission requise if ope.group.at + cancel_duration < timezone.now(): required_perms.add("kfet.cancel_old_operations") # Calcul de toutes modifs à faire en cas de validation # Pour les balances de comptes if not ope.group.on_acc.is_cash: to_accounts_balances[ope.group.on_acc] -= ope.amount if ope.addcost_for and ope.addcost_amount: to_accounts_balances[ope.addcost_for] -= ope.addcost_amount # Pour les groupes d'opés to_groups_amounts[ope.group] -= ope.amount # Pour les balances de caisses # Les balances de caisses dont il y a eu un relevé depuis la date # de la commande ne doivent pas être modifiées # TODO : Prendre en compte le dernier relevé où la caisse a été # comptée et donc modifier les balance_old (et amount_error) # des relevés suivants. # Note : Dans le cas où un CheckoutStatement est mis à jour # par `.save()`, amount_error est recalculé automatiquement, # ce qui n'est pas le cas en faisant un update sur queryset # TODO ? : Maj les balance_old de relevés pour modifier l'erreur last_statement = ( CheckoutStatement.objects.filter(checkout=ope.group.checkout) .order_by("at") .last() ) if not last_statement or last_statement.at < ope.group.at: if ope.is_checkout: if ope.group.on_acc.is_cash: to_checkouts_balances[ope.group.checkout] -= -ope.amount else: to_checkouts_balances[ope.group.checkout] -= ope.amount # Pour les stocks d'articles # Les stocks d'articles dont il y a eu un inventaire depuis la date # de la commande ne doivent pas être modifiés # TODO : Prendre en compte le dernier inventaire où le stock a bien # été compté (pas dans le cas d'une livraison). # Note : si InventoryArticle est maj par .save(), stock_error # est recalculé automatiquement if ope.article and ope.article_nb: last_stock = ( InventoryArticle.objects.select_related("inventory") .filter(article=ope.article) .order_by("inventory__at") .last() ) if not last_stock or last_stock.inventory.at < ope.group.at: to_articles_stocks[ope.article] += ope.article_nb if not opes: data["warnings"]["already_canceled"] = opes_already_canceled return JsonResponse(data) negative_accounts = [] # Checking permissions or stop for account in to_accounts_balances: (perms, stop) = account.perms_to_perform_operation( amount=to_accounts_balances[account] ) required_perms |= perms stop_all = stop_all or stop if stop: negative_accounts.append(account.trigramme) if stop_all or not request.user.has_perms(required_perms): missing_perms = get_missing_perms(required_perms, request.user) if missing_perms: data["errors"]["missing_perms"] = missing_perms if stop_all: data["errors"]["negative"] = negative_accounts return JsonResponse(data, status=403) canceled_by = required_perms and request.user.profile.account_kfet or None canceled_at = timezone.now() with transaction.atomic(): ( Operation.objects.filter(pk__in=opes).update( canceled_by=canceled_by, canceled_at=canceled_at ) ) for account in to_accounts_balances: ( Account.objects.filter(pk=account.pk).update( balance=F("balance") + to_accounts_balances[account] ) ) if not account.is_cash: # Should always be true, but we want to be sure account.refresh_from_db() account.update_negative() for checkout in to_checkouts_balances: Checkout.objects.filter(pk=checkout.pk).update( balance=F("balance") + to_checkouts_balances[checkout] ) for group in to_groups_amounts: OperationGroup.objects.filter(pk=group.pk).update( amount=F("amount") + to_groups_amounts[group] ) for article in to_articles_stocks: Article.objects.filter(pk=article.pk).update( stock=F("stock") + to_articles_stocks[article] ) # Need refresh from db cause we used update on querysets. # Sort objects by pk to get deterministic responses. opegroups_pk = [opegroup.pk for opegroup in to_groups_amounts] opegroups = ( OperationGroup.objects.values("id", "amount", "is_cof") .filter(pk__in=opegroups_pk) .order_by("pk") ) opes = ( Operation.objects.values("id", "canceled_at", "canceled_by__trigramme") .filter(pk__in=opes) .order_by("pk") ) checkouts_pk = [checkout.pk for checkout in to_checkouts_balances] checkouts = ( Checkout.objects.values("id", "balance") .filter(pk__in=checkouts_pk) .order_by("pk") ) articles_pk = [article.pk for articles in to_articles_stocks] articles = Article.objects.values("id", "stock").filter(pk__in=articles_pk) # Websocket data websocket_data = {"checkouts": [], "articles": []} for checkout in checkouts: websocket_data["checkouts"].append( {"id": checkout["id"], "balance": checkout["balance"]} ) for article in articles: websocket_data["articles"].append( {"id": article["id"], "stock": article["stock"]} ) consumers.KPsul.group_send("kfet.kpsul", websocket_data) data["canceled"] = list(opes) data["opegroups_to_update"] = list(opegroups) if opes_already_canceled: data["warnings"]["already_canceled"] = opes_already_canceled return JsonResponse(data) @login_required def history_json(request): # Récupération des paramètres from_date = request.POST.get("from", None) to_date = request.POST.get("to", None) checkouts = request.POST.getlist("checkouts[]", None) accounts = request.POST.getlist("accounts[]", None) transfers_only = request.POST.get("transfersonly", False) opes_only = request.POST.get("opesonly", False) # Construction de la requête (sur les transferts) pour le prefetch transfer_queryset_prefetch = Transfer.objects.select_related( "from_acc", "to_acc", "canceled_by" ) # Le check sur les comptes est dans le prefetch pour les transferts if accounts: transfer_queryset_prefetch = transfer_queryset_prefetch.filter( Q(from_acc__in=accounts) | Q(to_acc__in=accounts) ) if not request.user.has_perm("kfet.is_team"): try: acc = request.user.profile.account_kfet transfer_queryset_prefetch = transfer_queryset_prefetch.filter( Q(from_acc=acc) | Q(to_acc=acc) ) except Account.DoesNotExist: return JsonResponse({}, status=403) transfer_prefetch = Prefetch( "transfers", queryset=transfer_queryset_prefetch, to_attr="filtered_transfers" ) # Construction de la requête (sur les opérations) pour le prefetch ope_queryset_prefetch = Operation.objects.select_related( "article", "canceled_by", "addcost_for" ) ope_prefetch = Prefetch("opes", queryset=ope_queryset_prefetch) # Construction de la requête principale opegroups = ( OperationGroup.objects.prefetch_related(ope_prefetch) .select_related("on_acc", "valid_by") .order_by("at") ) transfergroups = ( TransferGroup.objects.prefetch_related(transfer_prefetch) .select_related("valid_by") .order_by("at") ) # Application des filtres if from_date: opegroups = opegroups.filter(at__gte=from_date) transfergroups = transfergroups.filter(at__gte=from_date) if to_date: opegroups = opegroups.filter(at__lt=to_date) transfergroups = transfergroups.filter(at__lt=to_date) if checkouts: opegroups = opegroups.filter(checkout__in=checkouts) transfergroups = TransferGroup.objects.none() if transfers_only: opegroups = OperationGroup.objects.none() if opes_only: transfergroups = TransferGroup.objects.none() if accounts: opegroups = opegroups.filter(on_acc__in=accounts) # Un non-membre de l'équipe n'a que accès à son historique if not request.user.has_perm("kfet.is_team"): opegroups = opegroups.filter(on_acc=request.user.profile.account_kfet) # Construction de la réponse history_groups = [] for opegroup in opegroups: opegroup_dict = { "type": "operation", "id": opegroup.id, "amount": opegroup.amount, "at": opegroup.at, "checkout_id": opegroup.checkout_id, "is_cof": opegroup.is_cof, "comment": opegroup.comment, "entries": [], "on_acc__trigramme": opegroup.on_acc and opegroup.on_acc.trigramme or None, } if request.user.has_perm("kfet.is_team"): opegroup_dict["valid_by__trigramme"] = ( opegroup.valid_by and opegroup.valid_by.trigramme or None ) for ope in opegroup.opes.all(): ope_dict = { "id": ope.id, "type": ope.type, "amount": ope.amount, "article_nb": ope.article_nb, "addcost_amount": ope.addcost_amount, "canceled_at": ope.canceled_at, "article__name": ope.article and ope.article.name or None, "addcost_for__trigramme": ope.addcost_for and ope.addcost_for.trigramme or None, } if request.user.has_perm("kfet.is_team"): ope_dict["canceled_by__trigramme"] = ( ope.canceled_by and ope.canceled_by.trigramme or None ) opegroup_dict["entries"].append(ope_dict) history_groups.append(opegroup_dict) for transfergroup in transfergroups: if transfergroup.filtered_transfers: transfergroup_dict = { "type": "transfer", "id": transfergroup.id, "at": transfergroup.at, "comment": transfergroup.comment, "entries": [], } if request.user.has_perm("kfet.is_team"): transfergroup_dict["valid_by__trigramme"] = ( transfergroup.valid_by and transfergroup.valid_by.trigramme or None ) for transfer in transfergroup.filtered_transfers: transfer_dict = { "id": transfer.id, "amount": transfer.amount, "canceled_at": transfer.canceled_at, "from_acc": transfer.from_acc.trigramme, "to_acc": transfer.to_acc.trigramme, } if request.user.has_perm("kfet.is_team"): transfer_dict["canceled_by__trigramme"] = ( transfer.canceled_by and transfer.canceled_by.trigramme or None ) transfergroup_dict["entries"].append(transfer_dict) history_groups.append(transfergroup_dict) history_groups.sort(key=lambda group: group["at"]) return JsonResponse({"groups": history_groups}) @teamkfet_required def kpsul_articles_data(request): articles = Article.objects.values( "id", "name", "price", "stock", "category_id", "category__name", "category__has_addcost", "category__has_reduction", ).filter(is_sold=True) return JsonResponse({"articles": list(articles)}) @teamkfet_required def history(request): data = {"filter_form": FilterHistoryForm()} return render(request, "kfet/history.html", data) # ----- # Settings views # ----- class SettingsList(TemplateView): template_name = "kfet/settings.html" config_list = permission_required("kfet.see_config")(SettingsList.as_view()) @method_decorator(kfet_password_auth, name="dispatch") class SettingsUpdate(SuccessMessageMixin, FormView): form_class = KFetConfigForm template_name = "kfet/settings_update.html" success_message = "Paramètres mis à jour" success_url = reverse_lazy("kfet.settings") def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.change_config"): form.add_error(None, "Permission refusée") return self.form_invalid(form) form.save() return super().form_valid(form) config_update = permission_required("kfet.change_config")(SettingsUpdate.as_view()) # ----- # Transfer views # ----- @method_decorator(teamkfet_required, name="dispatch") class TransferView(TemplateView): template_name = "kfet/transfers.html" @teamkfet_required def transfers_create(request): transfer_formset = TransferFormSet(queryset=Transfer.objects.none()) return render( request, "kfet/transfers_create.html", {"transfer_formset": transfer_formset} ) @teamkfet_required @kfet_password_auth def perform_transfers(request): data = {"errors": {}, "transfers": [], "transfergroup": 0} # Checking transfer_formset transfer_formset = TransferFormSet(request.POST) if not transfer_formset.is_valid(): return JsonResponse({"errors": list(transfer_formset.errors)}, status=400) transfers = transfer_formset.save(commit=False) # Initializing vars required_perms = set( ["kfet.add_transfer"] ) # Required perms to perform all transfers to_accounts_balances = defaultdict(lambda: 0) # For balances of accounts for transfer in transfers: to_accounts_balances[transfer.from_acc] -= transfer.amount to_accounts_balances[transfer.to_acc] += transfer.amount stop_all = False negative_accounts = [] # Checking if ok on all accounts for account in to_accounts_balances: (perms, stop) = account.perms_to_perform_operation( amount=to_accounts_balances[account] ) required_perms |= perms stop_all = stop_all or stop if stop: negative_accounts.append(account.trigramme) if stop_all or not request.user.has_perms(required_perms): missing_perms = get_missing_perms(required_perms, request.user) if missing_perms: data["errors"]["missing_perms"] = missing_perms if stop_all: data["errors"]["negative"] = negative_accounts return JsonResponse(data, status=403) # Creating transfer group transfergroup = TransferGroup() if required_perms: transfergroup.valid_by = request.user.profile.account_kfet comment = request.POST.get("comment", "") transfergroup.comment = comment.strip() with transaction.atomic(): # Updating balances accounts for account in to_accounts_balances: Account.objects.filter(pk=account.pk).update( balance=F("balance") + to_accounts_balances[account] ) account.refresh_from_db() if account.balance < 0: if hasattr(account, "negative"): if not account.negative.start: account.negative.start = timezone.now() account.negative.save() else: negative = AccountNegative(account=account, start=timezone.now()) negative.save() elif hasattr(account, "negative") and not account.negative.balance_offset: account.negative.delete() # Saving transfer group transfergroup.save() data["transfergroup"] = transfergroup.pk # Saving all transfers with group for transfer in transfers: transfer.group = transfergroup transfer.save() data["transfers"].append(transfer.pk) return JsonResponse(data) @teamkfet_required @kfet_password_auth def cancel_transfers(request): # Pour la réponse data = {"canceled": [], "warnings": {}, "errors": {}} # Checking if BAD REQUEST (transfers_pk not int or not existing) try: # Set pour virer les doublons transfers_post = set( map(int, filter(None, request.POST.getlist("transfers[]", []))) ) except ValueError: return JsonResponse(data, status=400) transfers_all = Transfer.objects.select_related( "group", "from_acc", "from_acc__negative", "to_acc", "to_acc__negative" ).filter(pk__in=transfers_post) transfers_pk = [transfer.pk for transfer in transfers_all] transfers_notexisting = [ transfer for transfer in transfers_post if transfer not in transfers_pk ] if transfers_notexisting: data["errors"]["transfers_notexisting"] = transfers_notexisting return JsonResponse(data, status=400) transfers_already_canceled = [] # Déjà annulée transfers = [] # Pas déjà annulée required_perms = set() stop_all = False cancel_duration = kfet_config.cancel_duration to_accounts_balances = defaultdict( lambda: 0 ) # Modifs à faire sur les balances des comptes for transfer in transfers_all: if transfer.canceled_at: # Transfert déjà annulé, va pour un warning en Response transfers_already_canceled.append(transfer.pk) else: transfers.append(transfer.pk) # Si transfer il y a plus de CANCEL_DURATION, permission requise if transfer.group.at + cancel_duration < timezone.now(): required_perms.add("kfet.cancel_old_operations") # Calcul de toutes modifs à faire en cas de validation # Pour les balances de comptes to_accounts_balances[transfer.from_acc] += transfer.amount to_accounts_balances[transfer.to_acc] += -transfer.amount if not transfers: data["warnings"]["already_canceled"] = transfers_already_canceled return JsonResponse(data) negative_accounts = [] # Checking permissions or stop for account in to_accounts_balances: (perms, stop) = account.perms_to_perform_operation( amount=to_accounts_balances[account] ) required_perms |= perms stop_all = stop_all or stop if stop: negative_accounts.append(account.trigramme) if stop_all or not request.user.has_perms(required_perms): missing_perms = get_missing_perms(required_perms, request.user) if missing_perms: data["errors"]["missing_perms"] = missing_perms if stop_all: data["errors"]["negative"] = negative_accounts return JsonResponse(data, status=403) canceled_by = required_perms and request.user.profile.account_kfet or None canceled_at = timezone.now() with transaction.atomic(): ( Transfer.objects.filter(pk__in=transfers).update( canceled_by=canceled_by, canceled_at=canceled_at ) ) for account in to_accounts_balances: Account.objects.filter(pk=account.pk).update( balance=F("balance") + to_accounts_balances[account] ) account.refresh_from_db() if account.balance < 0: if hasattr(account, "negative"): if not account.negative.start: account.negative.start = timezone.now() account.negative.save() else: negative = AccountNegative(account=account, start=timezone.now()) negative.save() elif hasattr(account, "negative") and not account.negative.balance_offset: account.negative.delete() transfers = ( Transfer.objects.values("id", "canceled_at", "canceled_by__trigramme") .filter(pk__in=transfers) .order_by("pk") ) data["canceled"] = list(transfers) if transfers_already_canceled: data["warnings"]["already_canceled"] = transfers_already_canceled return JsonResponse(data) class InventoryList(ListView): queryset = ( Inventory.objects.select_related("by", "order") .annotate(nb_articles=Count("articles")) .order_by("-at") ) template_name = "kfet/inventory.html" context_object_name = "inventories" @teamkfet_required @kfet_password_auth def inventory_create(request): articles = Article.objects.select_related("category").order_by( "category__name", "name" ) initial = [] for article in articles: initial.append( { "article": article.pk, "stock_old": article.stock, "name": article.name, "category": article.category_id, "category__name": article.category.name, "box_capacity": article.box_capacity or 0, } ) cls_formset = formset_factory(form=InventoryArticleForm, extra=0) if request.POST: formset = cls_formset(request.POST, initial=initial) if not request.user.has_perm("kfet.add_inventory"): messages.error(request, "Permission refusée") elif formset.is_valid(): with transaction.atomic(): articles = Article.objects.select_for_update() inventory = Inventory() inventory.by = request.user.profile.account_kfet saved = False for form in formset: if form.cleaned_data["stock_new"] is not None: if not saved: inventory.save() saved = True article = articles.get(pk=form.cleaned_data["article"].pk) stock_old = article.stock stock_new = form.cleaned_data["stock_new"] InventoryArticle.objects.create( inventory=inventory, article=article, stock_old=stock_old, stock_new=stock_new, ) article.stock = stock_new article.save() if saved: messages.success(request, "Inventaire créé") return redirect("kfet.inventory") messages.warning(request, "Bah alors ? On a rien compté ?") else: messages.error(request, "Pas marché") else: formset = cls_formset(initial=initial) return render(request, "kfet/inventory_create.html", {"formset": formset}) class InventoryRead(DetailView): model = Inventory template_name = "kfet/inventory_read.html" context_object_name = "inventory" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) inventoryarticles = ( InventoryArticle.objects.select_related("article", "article__category") .filter(inventory=self.object) .order_by("article__category__name", "article__name") ) context["inventoryarts"] = inventoryarticles return context # ----- # Order views # ----- class OrderList(ListView): queryset = Order.objects.select_related("supplier", "inventory") template_name = "kfet/order.html" context_object_name = "orders" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) context["suppliers"] = Supplier.objects.order_by("name") return context @teamkfet_required @kfet_password_auth def order_create(request, pk): supplier = get_object_or_404(Supplier, pk=pk) articles = ( Article.objects.filter(suppliers=supplier.pk) .distinct() .select_related("category") .order_by("category__name", "name") ) # Force hit to cache articles = list(articles) sales_q = ( Operation.objects.select_related("group") .filter(article__in=articles, canceled_at=None) .values("article") .annotate(nb=Sum("article_nb")) ) scale = WeekScale(last=True, n_steps=5, std_chunk=False) chunks = scale.chunkify_qs(sales_q, field="group__at") sales = [{d["article"]: d["nb"] for d in chunk} for chunk in chunks] initial = [] for article in articles: # Get sales for each 5 last weeks v_all = [chunk.get(article.pk, 0) for chunk in sales] # Take the 3 greatest (eg to avoid 2 weeks of vacations) v_3max = heapq.nlargest(3, v_all) # Get average and standard deviation v_moy = statistics.mean(v_3max) v_et = statistics.pstdev(v_3max, v_moy) # Expected sales for next week v_prev = v_moy + v_et # We want to have 1.5 * the expected sales in stock # (because sometimes some articles are not delivered) c_rec_tot = max(v_prev * 1.5 - article.stock, 0) # If ordered quantity is close enough to a level which can led to free # boxes, we increase it to this level. if article.box_capacity: c_rec_temp = c_rec_tot / article.box_capacity if c_rec_temp >= 10: c_rec = round(c_rec_temp) elif c_rec_temp > 5: c_rec = 10 elif c_rec_temp > 2: c_rec = 5 else: c_rec = round(c_rec_temp) initial.append( { "article": article.pk, "name": article.name, "category": article.category_id, "category__name": article.category.name, "stock": article.stock, "box_capacity": article.box_capacity, "v_all": v_all, "v_moy": round(v_moy), "v_et": round(v_et), "v_prev": round(v_prev), "c_rec": article.box_capacity and c_rec or round(c_rec_tot), } ) cls_formset = formset_factory(form=OrderArticleForm, extra=0) if request.POST: formset = cls_formset(request.POST, initial=initial) if not request.user.has_perm("kfet.add_order"): messages.error(request, "Permission refusée") elif formset.is_valid(): order = Order() order.supplier = supplier saved = False for form in formset: if form.cleaned_data["quantity_ordered"] is not None: if not saved: order.save() saved = True article = form.cleaned_data["article"] q_ordered = form.cleaned_data["quantity_ordered"] if article.box_capacity: q_ordered *= article.box_capacity OrderArticle.objects.create( order=order, article=article, quantity_ordered=q_ordered ) if saved: messages.success(request, "Commande créée") return redirect("kfet.order.read", order.pk) messages.warning(request, "Rien commandé => Pas de commande") else: messages.error(request, "Corrigez les erreurs") else: formset = cls_formset(initial=initial) scale.label_fmt = "S-{rev_i}" return render( request, "kfet/order_create.html", {"supplier": supplier, "formset": formset, "scale": scale}, ) class OrderRead(DetailView): model = Order template_name = "kfet/order_read.html" context_object_name = "order" def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) orderarticles = ( OrderArticle.objects.select_related("article", "article__category") .filter(order=self.object) .order_by("article__category__name", "article__name") ) context["orderarts"] = orderarticles mail = ( "Bonjour,\n\nNous voudrions pour le ##DATE## à la K-Fêt de " "l'ENS Ulm :" ) category = 0 for orderarticle in orderarticles: if category != orderarticle.article.category: category = orderarticle.article.category mail += "\n" nb = orderarticle.quantity_ordered box = "" if orderarticle.article.box_capacity: nb /= orderarticle.article.box_capacity if nb >= 2: box = " %ss de" % orderarticle.article.box_type else: box = " %s de" % orderarticle.article.box_type name = orderarticle.article.name.capitalize() mail += "\n- %s%s %s" % (round(nb), box, name) mail += ( "\n\nMerci d'appeler le numéro suivant lorsque les livreurs " "sont là : ##TELEPHONE##\nCordialement,\n##PRENOM## ##NOM## " ", pour la K-Fêt de l'ENS Ulm" ) context["mail"] = mail return context @teamkfet_required @kfet_password_auth def order_to_inventory(request, pk): order = get_object_or_404(Order, pk=pk) if hasattr(order, "inventory"): raise Http404 supplier_prefetch = Prefetch( "article__supplierarticle_set", queryset=( SupplierArticle.objects.filter(supplier=order.supplier).order_by("-at") ), to_attr="supplier", ) order_articles = ( OrderArticle.objects.filter(order=order.pk) .select_related("article", "article__category") .prefetch_related(supplier_prefetch) .order_by("article__category__name", "article__name") ) initial = [] for order_article in order_articles: article = order_article.article initial.append( { "article": article.pk, "name": article.name, "category": article.category_id, "category__name": article.category.name, "quantity_ordered": order_article.quantity_ordered, "quantity_received": order_article.quantity_ordered, "price_HT": article.supplier[0].price_HT, "TVA": article.supplier[0].TVA, "rights": article.supplier[0].rights, } ) cls_formset = formset_factory(OrderArticleToInventoryForm, extra=0) if request.method == "POST": formset = cls_formset(request.POST, initial=initial) if not request.user.has_perm("kfet.order_to_inventory"): messages.error(request, "Permission refusée") elif formset.is_valid(): with transaction.atomic(): inventory = Inventory.objects.create( order=order, by=request.user.profile.account_kfet ) new_supplierarticle = [] new_inventoryarticle = [] for form in formset: q_received = form.cleaned_data["quantity_received"] article = form.cleaned_data["article"] price_HT = form.cleaned_data["price_HT"] TVA = form.cleaned_data["TVA"] rights = form.cleaned_data["rights"] if any( ( form.initial["price_HT"] != price_HT, form.initial["TVA"] != TVA, form.initial["rights"] != rights, ) ): new_supplierarticle.append( SupplierArticle( supplier=order.supplier, article=article, price_HT=price_HT, TVA=TVA, rights=rights, ) ) ( OrderArticle.objects.filter( order=order, article=article ).update(quantity_received=q_received) ) new_inventoryarticle.append( InventoryArticle( inventory=inventory, article=article, stock_old=article.stock, stock_new=article.stock + q_received, ) ) article.stock += q_received if q_received > 0: article.is_sold = True article.save() SupplierArticle.objects.bulk_create(new_supplierarticle) InventoryArticle.objects.bulk_create(new_inventoryarticle) messages.success(request, "C'est tout bon !") return redirect("kfet.order") else: messages.error(request, "Corrigez les erreurs") else: formset = cls_formset(initial=initial) return render( request, "kfet/order_to_inventory.html", {"formset": formset, "order": order} ) @method_decorator(kfet_password_auth, name="dispatch") class SupplierUpdate(SuccessMessageMixin, UpdateView): model = Supplier template_name = "kfet/supplier_form.html" fields = ["name", "address", "email", "phone", "comment"] success_url = reverse_lazy("kfet.order") sucess_message = "Données fournisseur mis à jour" # Surcharge de la validation def form_valid(self, form): # Checking permission if not self.request.user.has_perm("kfet.change_supplier"): form.add_error(None, "Permission refusée") return self.form_invalid(form) # Updating return super().form_valid(form) # ========== # Statistics # ========== # --------------- # Vues génériques # --------------- # source : docs.djangoproject.com/fr/1.10/topics/class-based-views/mixins/ class JSONResponseMixin(object): """ A mixin that can be used to render a JSON response. """ def render_to_json_response(self, context, **response_kwargs): """ Returns a JSON response, transforming 'context' to make the payload. """ return JsonResponse(self.get_data(context), **response_kwargs) def get_data(self, context): """ Returns an object that will be serialized as JSON by json.dumps(). """ # Note: This is *EXTREMELY* naive; in reality, you'll need # to do much more complex handling to ensure that arbitrary # objects -- such as Django model instances or querysets # -- can be serialized as JSON. return context class JSONDetailView(JSONResponseMixin, BaseDetailView): """Returns a DetailView that renders a JSON.""" def render_to_response(self, context): return self.render_to_json_response(context) class PkUrlMixin(object): def get_object(self, *args, **kwargs): get_by = self.kwargs.get(self.pk_url_kwarg) return get_object_or_404(self.model, **{self.pk_url_kwarg: get_by}) class SingleResumeStat(JSONDetailView): """ Génère l'interface de sélection pour les statistiques d'un compte/article. L'interface est constituée d'une série de boutons, qui récupèrent et graphent des statistiques du même type, sur le même objet mais avec des arguments différents. Attributs : - url_stat : URL où récupérer les statistiques - stats : liste de dictionnaires avec les clés suivantes : - label : texte du bouton - url_params : paramètres GET à rajouter à `url_stat` - default : si `True`, graphe à montrer par défaut On peut aussi définir `stats` dynamiquement, via la fonction `get_stats`. """ id_prefix = "" nb_default = 0 stats = [] url_stat = None def get_context_data(self, **kwargs): # On n'hérite pas object_id = self.object.id context = {} stats = [] prefix = "{}_{}".format(self.id_prefix, object_id) for i, stat_def in enumerate(self.stats): url_pk = getattr(self.object, self.pk_url_kwarg) url_params_d = stat_def.get("url_params", {}) if len(url_params_d) > 0: url_params = "?{}".format(urlencode(url_params_d)) else: url_params = "" stats.append( { "label": stat_def["label"], "btn": "btn_{}_{}".format(prefix, i), "url": "{url}{params}".format( url=reverse(self.url_stat, args=[url_pk]), params=url_params ), } ) context["id_prefix"] = prefix context["content_id"] = "content_%s" % prefix context["stats"] = stats context["default_stat"] = self.nb_default context["object_id"] = object_id return context # ----------------------- # Evolution Balance perso # ----------------------- ID_PREFIX_ACC_BALANCE = "balance_acc" class AccountStatBalanceList(PkUrlMixin, SingleResumeStat): Menu général pour l'historique de balance d'un compte """ model = Account context_object_name = "account" pk_url_kwarg = "trigramme" url_stat = "kfet.account.stat.balance" id_prefix = ID_PREFIX_ACC_BALANCE stats = [ {"label": "Tout le temps"}, {"label": "1 an", "url_params": {"last_days": 365}}, {"label": "6 mois", "url_params": {"last_days": 183}}, {"label": "3 mois", "url_params": {"last_days": 90}}, {"label": "30 jours", "url_params": {"last_days": 30}}, ] nb_default = 0 def get_object(self, *args, **kwargs): obj = super().get_object(*args, **kwargs) if self.request.user != obj.user: raise Http404 return obj @method_decorator(login_required) def dispatch(self, *args, **kwargs): return super().dispatch(*args, **kwargs) class AccountStatBalance(PkUrlMixin, JSONDetailView): """ Statistiques (JSON) d'historique de balance d'un compte. Prend en compte les opérations et transferts sur la période donnée. """ model = Account pk_url_kwarg = "trigramme" context_object_name = "account" def get_changes_list(self, last_days=None, begin_date=None, end_date=None): account = self.object # prepare filters if last_days is not None: end_date = timezone.now() begin_date = end_date - timezone.timedelta(days=last_days) # prepare querysets # TODO: retirer les opgroup dont tous les op sont annulées opegroups = OperationGroup.objects.filter(on_acc=account) transfers = Transfer.objects.filter(canceled_at=None).select_related("group") recv_transfers = transfers.filter(to_acc=account) sent_transfers = transfers.filter(from_acc=account) # apply filters if begin_date is not None: opegroups = opegroups.filter(at__gte=begin_date) recv_transfers = recv_transfers.filter(group__at__gte=begin_date) sent_transfers = sent_transfers.filter(group__at__gte=begin_date) if end_date is not None: opegroups = opegroups.filter(at__lte=end_date) recv_transfers = recv_transfers.filter(group__at__lte=end_date) sent_transfers = sent_transfers.filter(group__at__lte=end_date) # On transforme tout ça en une liste de dictionnaires sous la forme # {'at': date, # 'amount': changement de la balance (négatif si diminue la balance, # positif si l'augmente), # 'label': text descriptif, # 'balance': état de la balance après l'action (0 pour le moment, # sera mis à jour lors d'une # autre passe) # } actions = [] actions.append( { "at": (begin_date or account.created_at).isoformat(), "amount": 0, "balance": 0, } ) actions.append( {"at": (end_date or timezone.now()).isoformat(), "amount": 0, "balance": 0} ) actions += ( [ {"at": ope_grp.at.isoformat(), "amount": ope_grp.amount, "balance": 0} for ope_grp in opegroups ] + [ {"at": tr.group.at.isoformat(), "amount": tr.amount, "balance": 0} for tr in recv_transfers ] + [ {"at": tr.group.at.isoformat(), "amount": -tr.amount, "balance": 0} for tr in sent_transfers ] ) # Maintenant on trie la liste des actions par ordre du plus récent # an plus ancien et on met à jour la balance if len(actions) > 1: actions = sorted(actions, key=lambda k: k["at"], reverse=True) actions[0]["balance"] = account.balance for i in range(len(actions) - 1): actions[i + 1]["balance"] = ( actions[i]["balance"] - actions[i + 1]["amount"] ) return actions def get_context_data(self, *args, **kwargs): context = {} last_days = self.request.GET.get("last_days", None) if last_days is not None: last_days = int(last_days) begin_date = self.request.GET.get("begin_date", None) end_date = self.request.GET.get("end_date", None) changes = self.get_changes_list( last_days=last_days, begin_date=begin_date, end_date=end_date ) context["charts"] = [ {"color": "rgb(200, 20, 60)", "label": "Balance", "values": changes} ] context["is_time_chart"] = True if len(changes) > 0: context["min_date"] = changes[-1]["at"] context["max_date"] = changes[0]["at"] # TODO: offset return context def get_object(self, *args, **kwargs): obj = super().get_object(*args, **kwargs) if self.request.user != obj.user: raise Http404 return obj @method_decorator(login_required) def dispatch(self, *args, **kwargs): return super().dispatch(*args, **kwargs) # ------------------------ # Consommation personnelle # ------------------------ ID_PREFIX_ACC_LAST = "last_acc" ID_PREFIX_ACC_LAST_DAYS = "last_days_acc" ID_PREFIX_ACC_LAST_WEEKS = "last_weeks_acc" ID_PREFIX_ACC_LAST_MONTHS = "last_months_acc" class AccountStatOperationList(PkUrlMixin, SingleResumeStat): @method_decorator(login_required, name="dispatch") """ Menu général pour l'historique de consommation d'un compte """ model = Account context_object_name = "account" pk_url_kwarg = "trigramme" id_prefix = ID_PREFIX_ACC_LAST nb_default = 2 stats = last_stats_manifest(types=[Operation.PURCHASE]) url_stat = "kfet.account.stat.operation" def get_object(self, *args, **kwargs): obj = super().get_object(*args, **kwargs) if self.request.user != obj.user: raise Http404 return obj @method_decorator(login_required) def dispatch(self, *args, **kwargs): return super().dispatch(*args, **kwargs) class AccountStatOperation(ScaleMixin, PkUrlMixin, JSONDetailView): @method_decorator(login_required, name="dispatch") """ Statistiques (JSON) de consommation (nb d'items achetés) d'un compte. """ model = Account pk_url_kwarg = "trigramme" context_object_name = "account" id_prefix = "" def get_operations(self, scale, types=None): # On selectionne les opérations qui correspondent # à l'article en question et qui ne sont pas annulées # puis on choisi pour chaques intervalle les opérations # effectuées dans ces intervalles de temps all_operations = ( Operation.objects.filter(group__on_acc=self.object, canceled_at=None) .values("article_nb", "group__at") .order_by("group__at") ) if types is not None: all_operations = all_operations.filter(type__in=types) chunks = scale.get_by_chunks( all_operations, field_db="group__at", field_callback=(lambda d: d["group__at"]), ) return chunks def get_context_data(self, *args, **kwargs): old_ctx = super().get_context_data(*args, **kwargs) context = {"labels": old_ctx["labels"]} scale = self.scale types = self.request.GET.get("types", None) if types is not None: types = ast.literal_eval(types) operations = self.get_operations(types=types, scale=scale) # On compte les opérations nb_ventes = [] for chunk in operations: ventes = sum(ope["article_nb"] for ope in chunk) nb_ventes.append(ventes) context["charts"] = [ { "color": "rgb(200, 20, 60)", "label": "NB items achetés", "values": nb_ventes, } ] return context def get_object(self, *args, **kwargs): obj = super().get_object(*args, **kwargs) if self.request.user != obj.user: raise Http404 return obj @method_decorator(login_required) def dispatch(self, *args, **kwargs): return super().dispatch(*args, **kwargs) # ------------------------ # Article Satistiques Last # ------------------------ ID_PREFIX_ART_LAST = "last_art" ID_PREFIX_ART_LAST_DAYS = "last_days_art" ID_PREFIX_ART_LAST_WEEKS = "last_weeks_art" ID_PREFIX_ART_LAST_MONTHS = "last_months_art" class ArticleStatSalesList(SingleResumeStat): """ Menu pour les statistiques de vente d'un article. """ model = Article context_object_name = "article" id_prefix = ID_PREFIX_ART_LAST nb_default = 2 url_stat = "kfet.article.stat.sales" stats = last_stats_manifest() @method_decorator(teamkfet_required) def dispatch(self, *args, **kwargs): return super().dispatch(*args, **kwargs) class ArticleStatSales(ScaleMixin, JSONDetailView): """ Statistiques (JSON) de vente d'un article. Sépare LIQ et les comptes K-Fêt, et rajoute le total. """ model = Article context_object_name = "article" def get_context_data(self, *args, **kwargs): old_ctx = super().get_context_data(*args, **kwargs) context = {"labels": old_ctx["labels"]} scale = self.scale all_purchases = ( Operation.objects.filter( type=Operation.PURCHASE, article=self.object, canceled_at=None ) .values("group__at", "article_nb") .order_by("group__at") ) liq_only = all_purchases.filter(group__on_acc__trigramme="LIQ") liq_exclude = all_purchases.exclude(group__on_acc__trigramme="LIQ") chunks_liq = scale.get_by_chunks( liq_only, field_db="group__at", field_callback=lambda d: d["group__at"] ) chunks_no_liq = scale.get_by_chunks( liq_exclude, field_db="group__at", field_callback=lambda d: d["group__at"] ) # On compte les opérations nb_ventes = [] nb_accounts = [] nb_liq = [] for chunk_liq, chunk_no_liq in zip(chunks_liq, chunks_no_liq): sum_accounts = sum(ope["article_nb"] for ope in chunk_no_liq) sum_liq = sum(ope["article_nb"] for ope in chunk_liq) nb_ventes.append(sum_accounts + sum_liq) nb_accounts.append(sum_accounts) nb_liq.append(sum_liq) context["charts"] = [ { "color": "rgb(200, 20, 60)", "label": "Toutes consommations", "values": nb_ventes, }, {"color": "rgb(54, 162, 235)", "label": "LIQ", "values": nb_liq}, { "color": "rgb(255, 205, 86)", "label": "Comptes K-Fêt", "values": nb_accounts, }, ] return context @method_decorator(teamkfet_required) def dispatch(self, *args, **kwargs): return super().dispatch(*args, **kwargs)