2880 lines
96 KiB
Python
2880 lines
96 KiB
Python
import heapq
|
|
import statistics
|
|
from collections import defaultdict
|
|
from datetime import datetime, timedelta
|
|
from decimal import Decimal
|
|
from typing import List, Tuple
|
|
from urllib.parse import urlencode
|
|
|
|
from asgiref.sync import async_to_sync
|
|
from channels.layers import get_channel_layer
|
|
from django.conf import settings
|
|
from django.contrib import messages
|
|
from django.contrib.auth.decorators import login_required, permission_required
|
|
from django.contrib.auth.mixins import PermissionRequiredMixin
|
|
from django.contrib.auth.models import Permission, User
|
|
from django.contrib.messages.views import SuccessMessageMixin
|
|
from django.core.exceptions import SuspiciousOperation
|
|
from django.core.mail import EmailMessage
|
|
from django.db import transaction
|
|
from django.db.models import (
|
|
Count,
|
|
DecimalField,
|
|
ExpressionWrapper,
|
|
F,
|
|
Max,
|
|
OuterRef,
|
|
Prefetch,
|
|
Q,
|
|
Subquery,
|
|
Sum,
|
|
)
|
|
from django.forms import ValidationError, formset_factory
|
|
from django.http import (
|
|
Http404,
|
|
HttpResponseBadRequest,
|
|
HttpResponseForbidden,
|
|
JsonResponse,
|
|
)
|
|
from django.shortcuts import get_object_or_404, redirect, render
|
|
from django.template import loader
|
|
from django.urls import reverse, reverse_lazy
|
|
from django.utils import timezone
|
|
from django.utils.decorators import method_decorator
|
|
from django.views.generic import DetailView, FormView, ListView, TemplateView
|
|
from django.views.generic.detail import BaseDetailView
|
|
from django.views.generic.edit import CreateView, DeleteView, UpdateView
|
|
|
|
from gestioncof.models import CofProfile
|
|
from kfet import KFET_DELETED_TRIGRAMME
|
|
from kfet.auth.decorators import kfet_password_auth
|
|
from kfet.autocomplete import kfet_account_only_autocomplete, kfet_autocomplete
|
|
from kfet.config import kfet_config
|
|
from kfet.decorators import teamkfet_required
|
|
from kfet.forms import (
|
|
AccountForm,
|
|
AccountFrozenForm,
|
|
AccountNoTriForm,
|
|
AccountPwdForm,
|
|
AccountStatForm,
|
|
AccountTriForm,
|
|
AddcostForm,
|
|
ArticleForm,
|
|
ArticleRestrictForm,
|
|
CategoryForm,
|
|
CheckoutForm,
|
|
CheckoutRestrictForm,
|
|
CheckoutStatementCreateForm,
|
|
CheckoutStatementUpdateForm,
|
|
CofForm,
|
|
ContactForm,
|
|
DemandeSoireeForm,
|
|
FilterHistoryForm,
|
|
InventoryArticleForm,
|
|
KFetConfigForm,
|
|
KPsulAccountForm,
|
|
KPsulCheckoutForm,
|
|
KPsulOperationFormSet,
|
|
KPsulOperationGroupForm,
|
|
OrderArticleForm,
|
|
OrderArticleToInventoryForm,
|
|
StatScaleForm,
|
|
TransferFormSet,
|
|
UserForm,
|
|
UserGroupForm,
|
|
UserInfoForm,
|
|
)
|
|
from kfet.models import (
|
|
Account,
|
|
AccountNegative,
|
|
Article,
|
|
ArticleCategory,
|
|
Checkout,
|
|
CheckoutStatement,
|
|
Inventory,
|
|
InventoryArticle,
|
|
Operation,
|
|
OperationGroup,
|
|
Order,
|
|
OrderArticle,
|
|
Supplier,
|
|
SupplierArticle,
|
|
Transfer,
|
|
TransferGroup,
|
|
)
|
|
from kfet.statistic import SCALE_DICT, DayScale, MonthScale, WeekScale, scale_url_params
|
|
from shared.views import AutocompleteView
|
|
|
|
from .auth import KFET_GENERIC_TRIGRAMME
|
|
from .auth.views import ( # noqa
|
|
AccountGroupCreate,
|
|
AccountGroupUpdate,
|
|
account_group,
|
|
login_generic,
|
|
)
|
|
|
|
|
|
def put_cleaned_data_in_dict(dict, form):
|
|
for field in form.cleaned_data:
|
|
dict[field] = form.cleaned_data[field]
|
|
|
|
|
|
class ContactView(FormView):
|
|
template_name = "kfet/contact.html"
|
|
form_class = ContactForm
|
|
success_url = reverse_lazy("kfet.contact")
|
|
|
|
def form_valid(self, form):
|
|
# Envoie un mail lorsque le formulaire est valide
|
|
EmailMessage(
|
|
form.cleaned_data["subject"],
|
|
form.cleaned_data["message"],
|
|
from_email=form.cleaned_data["from_email"],
|
|
to=("chefs-k-fet@ens.psl.eu",),
|
|
).send()
|
|
|
|
messages.success(
|
|
self.request,
|
|
"Votre message a bien été envoyé aux Wo·men K-Fêt.",
|
|
)
|
|
|
|
return super().form_valid(form)
|
|
|
|
|
|
class DemandeSoireeView(FormView):
|
|
template_name = "kfet/demande_soiree.html"
|
|
form_class = DemandeSoireeForm
|
|
success_url = reverse_lazy("kfet.demande-soiree")
|
|
|
|
def form_valid(self, form):
|
|
destinataires = ["chefs-k-fet@ens.psl.eu"]
|
|
|
|
if form.cleaned_data["contact_boum"]:
|
|
destinataires.append("boum@ens.psl.eu")
|
|
|
|
if form.cleaned_data["contact_pls"]:
|
|
destinataires.append("pls@ens.psl.eu")
|
|
|
|
# Envoie un mail lorsque le formulaire est valide
|
|
EmailMessage(
|
|
f"Demande de soirée le {form.cleaned_data['date']}",
|
|
loader.render_to_string(
|
|
"kfet/mails/demande_soiree.txt", context=form.cleaned_data
|
|
),
|
|
from_email=form.cleaned_data["from_email"],
|
|
to=destinataires,
|
|
cc=[form.cleaned_data["from_email"]],
|
|
).send()
|
|
|
|
messages.success(
|
|
self.request,
|
|
"Votre demande de soirée a bien été envoyée.",
|
|
)
|
|
|
|
return super().form_valid(form)
|
|
|
|
|
|
# -----
|
|
# Account views
|
|
# -----
|
|
|
|
# Account - General
|
|
|
|
|
|
@login_required
|
|
@teamkfet_required
|
|
def account(request):
|
|
accounts = Account.objects.select_related("cofprofile__user").order_by("trigramme")
|
|
return render(request, "kfet/account.html", {"accounts": accounts})
|
|
|
|
|
|
@login_required
|
|
@teamkfet_required
|
|
def account_is_validandfree_ajax(request):
|
|
if not request.GET.get("trigramme", ""):
|
|
raise Http404
|
|
trigramme = request.GET.get("trigramme")
|
|
data = Account.is_validandfree(trigramme)
|
|
return JsonResponse(data)
|
|
|
|
|
|
# Account - Create
|
|
|
|
|
|
@login_required
|
|
@teamkfet_required
|
|
@kfet_password_auth
|
|
def account_create(request):
|
|
# Enregistrement
|
|
if request.method == "POST":
|
|
trigramme_form = AccountTriForm(request.POST)
|
|
|
|
# Peuplement des forms
|
|
username = request.POST.get("username")
|
|
login_clipper = request.POST.get("login_clipper")
|
|
|
|
forms = get_account_create_forms(
|
|
request, username=username, login_clipper=login_clipper
|
|
)
|
|
|
|
account_form = forms["account_form"]
|
|
cof_form = forms["cof_form"]
|
|
user_form = forms["user_form"]
|
|
|
|
if all(
|
|
(
|
|
user_form.is_valid(),
|
|
cof_form.is_valid(),
|
|
trigramme_form.is_valid(),
|
|
account_form.is_valid(),
|
|
)
|
|
):
|
|
# Checking permission
|
|
if not request.user.has_perm("kfet.add_account"):
|
|
messages.error(
|
|
request, "Permission refusée", extra_tags="permission-denied"
|
|
)
|
|
else:
|
|
data = {}
|
|
# Fill data for Account.save()
|
|
put_cleaned_data_in_dict(data, user_form)
|
|
put_cleaned_data_in_dict(data, cof_form)
|
|
|
|
try:
|
|
account = trigramme_form.save(data=data)
|
|
account_form = AccountNoTriForm(request.POST, instance=account)
|
|
account_form.save()
|
|
messages.success(request, "Compte créé : %s" % account.trigramme)
|
|
account.send_creation_email()
|
|
return redirect("kfet.account.create")
|
|
except Account.UserHasAccount as e:
|
|
messages.error(
|
|
request,
|
|
"Cet utilisateur a déjà un compte K-Fêt : %s" % e.trigramme,
|
|
)
|
|
else:
|
|
initial = {"trigramme": request.GET.get("trigramme", "")}
|
|
trigramme_form = AccountTriForm(initial=initial)
|
|
account_form = None
|
|
cof_form = None
|
|
user_form = None
|
|
|
|
return render(
|
|
request,
|
|
"kfet/account_create.html",
|
|
{
|
|
"trigramme_form": trigramme_form,
|
|
"account_form": account_form,
|
|
"cof_form": cof_form,
|
|
"user_form": user_form,
|
|
},
|
|
)
|
|
|
|
|
|
def account_form_set_readonly_fields(user_form, cof_form):
|
|
user_form.fields["username"].widget.attrs["readonly"] = True
|
|
user_form.fields["first_name"].widget.attrs["readonly"] = True
|
|
user_form.fields["last_name"].widget.attrs["readonly"] = True
|
|
user_form.fields["email"].widget.attrs["readonly"] = True
|
|
cof_form.fields["login_clipper"].widget.attrs["readonly"] = True
|
|
cof_form.fields["departement"].widget.attrs["readonly"] = True
|
|
cof_form.fields["is_cof"].widget.attrs["disabled"] = True
|
|
|
|
|
|
def get_account_create_forms(
|
|
request=None, username=None, login_clipper=None, fullname=None
|
|
):
|
|
user = None
|
|
clipper = False
|
|
if login_clipper and (login_clipper == username or not username):
|
|
# à partir d'un clipper
|
|
# le user associé à ce clipper ne devrait pas encore exister
|
|
clipper = True
|
|
try:
|
|
# Vérification que clipper ne soit pas déjà dans User
|
|
user = User.objects.get(username=login_clipper)
|
|
# Ici, on nous a menti, le user existe déjà
|
|
username = user.username
|
|
clipper = False
|
|
except User.DoesNotExist:
|
|
# Clipper (sans user déjà existant)
|
|
|
|
# UserForm - Prefill
|
|
user_initial = {
|
|
"username": login_clipper,
|
|
"email": "%s@clipper.ens.fr" % login_clipper,
|
|
}
|
|
if fullname:
|
|
# Prefill du nom et prénom
|
|
names = fullname.split()
|
|
# Le premier, c'est le prénom
|
|
user_initial["first_name"] = names[0]
|
|
if len(names) > 1:
|
|
# Si d'autres noms -> tous dans le nom de famille
|
|
user_initial["last_name"] = " ".join(names[1:])
|
|
# CofForm - Prefill
|
|
cof_initial = {"login_clipper": login_clipper}
|
|
|
|
# Form créations
|
|
if request:
|
|
user_form = UserForm(request.POST, initial=user_initial)
|
|
cof_form = CofForm(request.POST, initial=cof_initial)
|
|
else:
|
|
user_form = UserForm(initial=user_initial)
|
|
cof_form = CofForm(initial=cof_initial)
|
|
|
|
# Protection (read-only) des champs username et login_clipper
|
|
account_form_set_readonly_fields(user_form, cof_form)
|
|
if username and not clipper:
|
|
try:
|
|
user = User.objects.get(username=username)
|
|
# le user existe déjà
|
|
# récupération du profil cof
|
|
(cof, _) = CofProfile.objects.get_or_create(user=user)
|
|
# UserForm + CofForm - Création à partir des instances existantes
|
|
if request:
|
|
user_form = UserForm(request.POST, instance=user)
|
|
cof_form = CofForm(request.POST, instance=cof)
|
|
else:
|
|
user_form = UserForm(instance=user)
|
|
cof_form = CofForm(instance=cof)
|
|
# Protection (read-only) des champs username, login_clipper et is_cof
|
|
account_form_set_readonly_fields(user_form, cof_form)
|
|
except User.DoesNotExist:
|
|
# le username donnée n'existe pas -> Création depuis rien
|
|
# (éventuellement en cours avec erreurs précédemment)
|
|
pass
|
|
if not user and not clipper:
|
|
# connaît pas du tout, faut tout remplir
|
|
if request:
|
|
user_form = UserForm(request.POST)
|
|
cof_form = CofForm(request.POST)
|
|
else:
|
|
user_form = UserForm()
|
|
cof_form = CofForm()
|
|
# mais on laisse le username en écriture
|
|
cof_form.fields["login_clipper"].widget.attrs["readonly"] = True
|
|
cof_form.fields["is_cof"].widget.attrs["disabled"] = True
|
|
|
|
if request:
|
|
account_form = AccountNoTriForm(request.POST)
|
|
else:
|
|
account_form = AccountNoTriForm()
|
|
|
|
return {"account_form": account_form, "cof_form": cof_form, "user_form": user_form}
|
|
|
|
|
|
@login_required
|
|
@teamkfet_required
|
|
def account_create_ajax(request, username=None, login_clipper=None, fullname=None):
|
|
forms = get_account_create_forms(
|
|
request=None, username=username, login_clipper=login_clipper, fullname=fullname
|
|
)
|
|
return render(
|
|
request,
|
|
"kfet/account_create_form.html",
|
|
{
|
|
"account_form": forms["account_form"],
|
|
"cof_form": forms["cof_form"],
|
|
"user_form": forms["user_form"],
|
|
},
|
|
)
|
|
|
|
|
|
# Account - Read
|
|
|
|
|
|
@login_required
|
|
def account_read(request, trigramme):
|
|
account = get_object_or_404(Account, trigramme=trigramme)
|
|
|
|
# Checking permissions
|
|
if not account.readable or (
|
|
not request.user.has_perm("kfet.is_team") and request.user != account.user
|
|
):
|
|
raise Http404
|
|
|
|
addcosts = (
|
|
OperationGroup.objects.filter(opes__addcost_for=account, opes__canceled_at=None)
|
|
.extra({"date": "date(at)"})
|
|
.values("date")
|
|
.annotate(sum_addcosts=Sum("opes__addcost_amount"))
|
|
.order_by("-date")
|
|
)
|
|
|
|
return render(
|
|
request, "kfet/account_read.html", {"account": account, "addcosts": addcosts}
|
|
)
|
|
|
|
|
|
# Account - Update
|
|
|
|
|
|
@teamkfet_required
|
|
@kfet_password_auth
|
|
def account_update(request, trigramme):
|
|
account = get_object_or_404(Account, trigramme=trigramme)
|
|
|
|
# Checking permissions
|
|
if not account.editable:
|
|
# Plus de leak de trigramme !
|
|
return HttpResponseForbidden
|
|
|
|
user_info_form = UserInfoForm(instance=account.user)
|
|
account_form = AccountForm(instance=account)
|
|
group_form = UserGroupForm(instance=account.user)
|
|
frozen_form = AccountFrozenForm(instance=account)
|
|
pwd_form = AccountPwdForm()
|
|
|
|
if request.method == "POST":
|
|
self_update = request.user == account.user
|
|
account_form = AccountForm(request.POST, instance=account)
|
|
group_form = UserGroupForm(request.POST, instance=account.user)
|
|
frozen_form = AccountFrozenForm(request.POST, instance=account)
|
|
pwd_form = AccountPwdForm(request.POST, account=account)
|
|
|
|
forms = []
|
|
warnings = []
|
|
|
|
if self_update or request.user.has_perm("kfet.change_account"):
|
|
forms.append(account_form)
|
|
elif account_form.has_changed():
|
|
warnings.append("compte")
|
|
|
|
if request.user.has_perm("kfet.manage_perms"):
|
|
forms.append(group_form)
|
|
forms.append(frozen_form)
|
|
elif group_form.has_changed():
|
|
warnings.append("statut d'équipe")
|
|
|
|
# Il ne faut pas valider `pwd_form` si elle est inchangée
|
|
if pwd_form.has_changed():
|
|
if self_update or request.user.has_perm("kfet.change_account_password"):
|
|
forms.append(pwd_form)
|
|
else:
|
|
warnings.append("mot de passe")
|
|
|
|
# Updating account info
|
|
if forms == []:
|
|
messages.error(
|
|
request,
|
|
"Informations non mises à jour : permission refusée",
|
|
extra_tags="permission-denied",
|
|
)
|
|
else:
|
|
if all(form.is_valid() for form in forms):
|
|
for form in forms:
|
|
form.save()
|
|
|
|
if len(warnings):
|
|
messages.warning(
|
|
request,
|
|
"Permissions insuffisantes pour modifier"
|
|
" les informations suivantes : {}.".format(", ".join(warnings)),
|
|
)
|
|
if self_update:
|
|
messages.success(request, "Vos informations ont été mises à jour !")
|
|
else:
|
|
messages.success(
|
|
request,
|
|
"Informations du compte %s mises à jour" % account.trigramme,
|
|
)
|
|
|
|
return redirect("kfet.account.read", account.trigramme)
|
|
else:
|
|
messages.error(
|
|
request, "Informations non mises à jour : corrigez les erreurs"
|
|
)
|
|
|
|
return render(
|
|
request,
|
|
"kfet/account_update.html",
|
|
{
|
|
"user_info_form": user_info_form,
|
|
"account": account,
|
|
"account_form": account_form,
|
|
"frozen_form": frozen_form,
|
|
"group_form": group_form,
|
|
"pwd_form": pwd_form,
|
|
},
|
|
)
|
|
|
|
|
|
# Account - Delete
|
|
|
|
|
|
class AccountDelete(PermissionRequiredMixin, DeleteView):
|
|
model = Account
|
|
slug_field = "trigramme"
|
|
slug_url_kwarg = "trigramme"
|
|
success_url = reverse_lazy("kfet.account")
|
|
success_message = "Compte supprimé avec succès !"
|
|
permission_required = "kfet.delete_account"
|
|
|
|
http_method_names = ["post"]
|
|
|
|
def delete(self, request, *args, **kwargs):
|
|
self.object = self.get_object()
|
|
if self.object.balance >= 0.01:
|
|
messages.error(
|
|
request,
|
|
"Impossible de supprimer un compte "
|
|
"avec une balance strictement positive !",
|
|
)
|
|
return redirect("kfet.account.read", self.object.trigramme)
|
|
|
|
if self.object.trigramme in [
|
|
"LIQ",
|
|
KFET_GENERIC_TRIGRAMME,
|
|
KFET_DELETED_TRIGRAMME,
|
|
"#13",
|
|
]:
|
|
messages.error(request, "Impossible de supprimer un trigramme protégé !")
|
|
return redirect("kfet.account.read", self.object.trigramme)
|
|
|
|
# SuccessMessageMixin does not work with DeleteView, see :
|
|
# https://code.djangoproject.com/ticket/21926
|
|
messages.success(request, self.success_message)
|
|
return super().delete(request, *args, **kwargs)
|
|
|
|
|
|
class AccountNegativeList(ListView):
|
|
queryset = (
|
|
AccountNegative.objects.select_related("account", "account__cofprofile__user")
|
|
.filter(account__balance__lt=0)
|
|
.exclude(account__trigramme="#13")
|
|
)
|
|
template_name = "kfet/account_negative.html"
|
|
context_object_name = "negatives"
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
balances = (neg.account.balance for neg in self.object_list)
|
|
context["negatives_sum"] = sum(balances)
|
|
return context
|
|
|
|
|
|
# -----
|
|
# Checkout views
|
|
# -----
|
|
|
|
# Checkout - General
|
|
|
|
|
|
class CheckoutList(ListView):
|
|
model = Checkout
|
|
template_name = "kfet/checkout.html"
|
|
context_object_name = "checkouts"
|
|
|
|
|
|
# Checkout - Create
|
|
|
|
|
|
@method_decorator(kfet_password_auth, name="dispatch")
|
|
class CheckoutCreate(SuccessMessageMixin, CreateView):
|
|
model = Checkout
|
|
template_name = "kfet/checkout_create.html"
|
|
form_class = CheckoutForm
|
|
success_message = "Nouvelle caisse : %(name)s"
|
|
|
|
# Surcharge de la validation
|
|
def form_valid(self, form):
|
|
# Checking permission
|
|
if not self.request.user.has_perm("kfet.add_checkout"):
|
|
form.add_error(
|
|
None, ValidationError("Permission refusée", code="permission-denied")
|
|
)
|
|
return self.form_invalid(form)
|
|
|
|
# Creating
|
|
form.instance.created_by = self.request.user.profile.account_kfet
|
|
form.save()
|
|
|
|
return super().form_valid(form)
|
|
|
|
|
|
# Checkout - Read
|
|
|
|
|
|
class CheckoutRead(DetailView):
|
|
model = Checkout
|
|
template_name = "kfet/checkout_read.html"
|
|
context_object_name = "checkout"
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
context["statements"] = context["checkout"].statements.order_by("-at")
|
|
return context
|
|
|
|
|
|
# Checkout - Update
|
|
|
|
|
|
@method_decorator(kfet_password_auth, name="dispatch")
|
|
class CheckoutUpdate(SuccessMessageMixin, UpdateView):
|
|
model = Checkout
|
|
template_name = "kfet/checkout_update.html"
|
|
form_class = CheckoutRestrictForm
|
|
success_message = "Informations mises à jour pour la caisse : %(name)s"
|
|
|
|
# Surcharge de la validation
|
|
def form_valid(self, form):
|
|
# Checking permission
|
|
if not self.request.user.has_perm("kfet.change_checkout"):
|
|
form.add_error(
|
|
None, ValidationError("Permission refusée", code="permission-denied")
|
|
)
|
|
return self.form_invalid(form)
|
|
# Updating
|
|
return super().form_valid(form)
|
|
|
|
|
|
# -----
|
|
# Checkout Statement views
|
|
# -----
|
|
|
|
# Checkout Statement - General
|
|
|
|
|
|
class CheckoutStatementList(ListView):
|
|
model = CheckoutStatement
|
|
queryset = CheckoutStatement.objects.order_by("-at")
|
|
template_name = "kfet/checkoutstatement.html"
|
|
context_object_name = "checkoutstatements"
|
|
|
|
|
|
# Checkout Statement - Create
|
|
|
|
|
|
def getAmountTaken(data):
|
|
return Decimal(
|
|
data.taken_001 * 0.01
|
|
+ data.taken_002 * 0.02
|
|
+ data.taken_005 * 0.05
|
|
+ data.taken_01 * 0.1
|
|
+ data.taken_02 * 0.2
|
|
+ data.taken_05 * 0.5
|
|
+ data.taken_1 * 1
|
|
+ data.taken_2 * 2
|
|
+ data.taken_5 * 5
|
|
+ data.taken_10 * 10
|
|
+ data.taken_20 * 20
|
|
+ data.taken_50 * 50
|
|
+ data.taken_100 * 100
|
|
+ data.taken_200 * 200
|
|
+ data.taken_500 * 500
|
|
+ float(data.taken_cheque)
|
|
)
|
|
|
|
|
|
def getAmountBalance(data):
|
|
return Decimal(
|
|
data["balance_001"] * 0.01
|
|
+ data["balance_002"] * 0.02
|
|
+ data["balance_005"] * 0.05
|
|
+ data["balance_01"] * 0.1
|
|
+ data["balance_02"] * 0.2
|
|
+ data["balance_05"] * 0.5
|
|
+ data["balance_1"] * 1
|
|
+ data["balance_2"] * 2
|
|
+ data["balance_5"] * 5
|
|
+ data["balance_10"] * 10
|
|
+ data["balance_20"] * 20
|
|
+ data["balance_50"] * 50
|
|
+ data["balance_100"] * 100
|
|
+ data["balance_200"] * 200
|
|
+ data["balance_500"] * 500
|
|
)
|
|
|
|
|
|
@method_decorator(kfet_password_auth, name="dispatch")
|
|
class CheckoutStatementCreate(SuccessMessageMixin, CreateView):
|
|
model = CheckoutStatement
|
|
template_name = "kfet/checkoutstatement_create.html"
|
|
form_class = CheckoutStatementCreateForm
|
|
success_message = "Nouveau relevé : %(checkout)s - %(at)s"
|
|
|
|
def get_success_url(self):
|
|
return reverse_lazy(
|
|
"kfet.checkout.read", kwargs={"pk": self.kwargs["pk_checkout"]}
|
|
)
|
|
|
|
def get_success_message(self, cleaned_data):
|
|
return self.success_message % dict(
|
|
cleaned_data, checkout=self.object.checkout.name, at=self.object.at
|
|
)
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
checkout = Checkout.objects.get(pk=self.kwargs["pk_checkout"])
|
|
context["checkout"] = checkout
|
|
return context
|
|
|
|
def form_valid(self, form):
|
|
# Checking permission
|
|
if not self.request.user.has_perm("kfet.add_checkoutstatement"):
|
|
form.add_error(
|
|
None, ValidationError("Permission refusée", code="permission-denied")
|
|
)
|
|
return self.form_invalid(form)
|
|
# Creating
|
|
form.instance.amount_taken = getAmountTaken(form.instance)
|
|
if not form.instance.not_count:
|
|
form.instance.balance_new = getAmountBalance(form.cleaned_data)
|
|
form.instance.checkout_id = self.kwargs["pk_checkout"]
|
|
form.instance.by = self.request.user.profile.account_kfet
|
|
return super().form_valid(form)
|
|
|
|
|
|
@method_decorator(kfet_password_auth, name="dispatch")
|
|
class CheckoutStatementUpdate(SuccessMessageMixin, UpdateView):
|
|
model = CheckoutStatement
|
|
template_name = "kfet/checkoutstatement_update.html"
|
|
form_class = CheckoutStatementUpdateForm
|
|
success_message = "Relevé modifié"
|
|
|
|
def get_success_url(self):
|
|
return reverse_lazy(
|
|
"kfet.checkout.read", kwargs={"pk": self.kwargs["pk_checkout"]}
|
|
)
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
checkout = Checkout.objects.get(pk=self.kwargs["pk_checkout"])
|
|
context["checkout"] = checkout
|
|
return context
|
|
|
|
def form_valid(self, form):
|
|
# Checking permission
|
|
if not self.request.user.has_perm("kfet.change_checkoutstatement"):
|
|
form.add_error(
|
|
None, ValidationError("Permission refusée", code="permission-denied")
|
|
)
|
|
return self.form_invalid(form)
|
|
# Updating
|
|
form.instance.amount_taken = getAmountTaken(form.instance)
|
|
return super().form_valid(form)
|
|
|
|
|
|
# -----
|
|
# Category views
|
|
# -----
|
|
|
|
|
|
# Category - General
|
|
class CategoryList(ListView):
|
|
queryset = ArticleCategory.objects.prefetch_related("articles").order_by("name")
|
|
template_name = "kfet/category.html"
|
|
context_object_name = "categories"
|
|
|
|
|
|
# Category - Update
|
|
@method_decorator(kfet_password_auth, name="dispatch")
|
|
class CategoryUpdate(SuccessMessageMixin, UpdateView):
|
|
model = ArticleCategory
|
|
template_name = "kfet/category_update.html"
|
|
form_class = CategoryForm
|
|
success_url = reverse_lazy("kfet.category")
|
|
success_message = "Informations mises à jour pour la catégorie : %(name)s"
|
|
|
|
# Surcharge de la validation
|
|
def form_valid(self, form):
|
|
# Checking permission
|
|
if not self.request.user.has_perm("kfet.change_articlecategory"):
|
|
form.add_error(
|
|
None, ValidationError("Permission refusée", code="permission-denied")
|
|
)
|
|
return self.form_invalid(form)
|
|
|
|
# Updating
|
|
return super().form_valid(form)
|
|
|
|
|
|
# -----
|
|
# Article views
|
|
# -----
|
|
|
|
|
|
# Article - General
|
|
class ArticleList(ListView):
|
|
queryset = (
|
|
Article.objects.select_related("category")
|
|
.prefetch_related(
|
|
Prefetch(
|
|
"inventories",
|
|
queryset=Inventory.objects.order_by("-at"),
|
|
to_attr="inventory",
|
|
)
|
|
)
|
|
.order_by("category__name", "-is_sold", "name")
|
|
)
|
|
template_name = "kfet/article.html"
|
|
context_object_name = "articles"
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
articles = context[self.context_object_name]
|
|
context["nb_articles"] = len(articles)
|
|
context[self.context_object_name] = articles.filter(is_sold=True)
|
|
context["not_sold_articles"] = articles.filter(is_sold=False)
|
|
return context
|
|
|
|
|
|
# Article - Create
|
|
@method_decorator(kfet_password_auth, name="dispatch")
|
|
class ArticleCreate(SuccessMessageMixin, CreateView):
|
|
model = Article
|
|
template_name = "kfet/article_create.html"
|
|
form_class = ArticleForm
|
|
success_message = "Nouvel item : %(category)s - %(name)s"
|
|
|
|
# Surcharge de la validation
|
|
def form_valid(self, form):
|
|
# Checking permission
|
|
if not self.request.user.has_perm("kfet.add_article"):
|
|
form.add_error(
|
|
None, ValidationError("Permission refusée", code="permission-denied")
|
|
)
|
|
return self.form_invalid(form)
|
|
|
|
# Save ici pour save le manytomany suppliers
|
|
article = form.save()
|
|
# Save des suppliers déjà existant
|
|
for supplier in form.cleaned_data["suppliers"]:
|
|
SupplierArticle.objects.create(article=article, supplier=supplier)
|
|
|
|
# Nouveau supplier
|
|
supplier_new = form.cleaned_data["supplier_new"].strip()
|
|
if supplier_new:
|
|
supplier, created = Supplier.objects.get_or_create(name=supplier_new)
|
|
if created:
|
|
SupplierArticle.objects.create(article=article, supplier=supplier)
|
|
|
|
# Inventaire avec stock initial
|
|
inventory = Inventory()
|
|
inventory.by = self.request.user.profile.account_kfet
|
|
inventory.save()
|
|
InventoryArticle.objects.create(
|
|
inventory=inventory,
|
|
article=article,
|
|
stock_old=article.stock,
|
|
stock_new=article.stock,
|
|
)
|
|
|
|
# Creating
|
|
return super().form_valid(form)
|
|
|
|
|
|
# Article - Read
|
|
class ArticleRead(DetailView):
|
|
model = Article
|
|
template_name = "kfet/article_read.html"
|
|
context_object_name = "article"
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
inventoryarts = (
|
|
InventoryArticle.objects.filter(article=self.object)
|
|
.select_related("inventory")
|
|
.order_by("-inventory__at")
|
|
)
|
|
context["inventoryarts"] = inventoryarts
|
|
supplierarts = (
|
|
SupplierArticle.objects.filter(article=self.object)
|
|
.select_related("supplier")
|
|
.order_by("-at")
|
|
)
|
|
context["supplierarts"] = supplierarts
|
|
return context
|
|
|
|
|
|
# Article - Update
|
|
@method_decorator(kfet_password_auth, name="dispatch")
|
|
class ArticleUpdate(SuccessMessageMixin, UpdateView):
|
|
model = Article
|
|
template_name = "kfet/article_update.html"
|
|
form_class = ArticleRestrictForm
|
|
success_message = "Informations mises à jour pour l'article : %(name)s"
|
|
|
|
# Surcharge de la validation
|
|
def form_valid(self, form):
|
|
# Checking permission
|
|
if not self.request.user.has_perm("kfet.change_article"):
|
|
form.add_error(
|
|
None, ValidationError("Permission refusée", code="permission-denied")
|
|
)
|
|
return self.form_invalid(form)
|
|
|
|
# Save ici pour save le manytomany suppliers
|
|
article = form.save()
|
|
# Save des suppliers déjà existant
|
|
for supplier in form.cleaned_data["suppliers"]:
|
|
if supplier not in article.suppliers.all():
|
|
SupplierArticle.objects.create(article=article, supplier=supplier)
|
|
|
|
# On vire les suppliers désélectionnés
|
|
for supplier in article.suppliers.all():
|
|
if supplier not in form.cleaned_data["suppliers"]:
|
|
SupplierArticle.objects.filter(
|
|
article=article, supplier=supplier
|
|
).delete()
|
|
|
|
# Nouveau supplier
|
|
supplier_new = form.cleaned_data["supplier_new"].strip()
|
|
if supplier_new:
|
|
supplier, created = Supplier.objects.get_or_create(name=supplier_new)
|
|
if created:
|
|
SupplierArticle.objects.create(article=article, supplier=supplier)
|
|
|
|
# Updating
|
|
return super().form_valid(form)
|
|
|
|
|
|
class ArticleDelete(PermissionRequiredMixin, DeleteView):
|
|
model = Article
|
|
success_url = reverse_lazy("kfet.article")
|
|
success_message = "Article supprimé avec succès !"
|
|
permission_required = "kfet.delete_article"
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
return redirect("kfet.article.read", self.kwargs.get(self.pk_url_kwarg))
|
|
|
|
def delete(self, request, *args, **kwargs):
|
|
messages.success(request, self.success_message)
|
|
return super().delete(request, *args, **kwargs)
|
|
|
|
|
|
# -----
|
|
# K-Psul
|
|
# -----
|
|
|
|
|
|
@teamkfet_required
|
|
def kpsul(request):
|
|
data = {}
|
|
data["operationgroup_form"] = KPsulOperationGroupForm()
|
|
data["trigramme_form"] = KPsulAccountForm()
|
|
data["checkout_form"] = KPsulCheckoutForm()
|
|
data["operation_formset"] = KPsulOperationFormSet(queryset=Operation.objects.none())
|
|
return render(request, "kfet/kpsul.html", data)
|
|
|
|
|
|
@teamkfet_required
|
|
def kpsul_get_settings(request):
|
|
addcost_for = kfet_config.addcost_for
|
|
data = {
|
|
"subvention_cof": kfet_config.subvention_cof,
|
|
"addcost_for": addcost_for and addcost_for.trigramme or "",
|
|
"addcost_amount": kfet_config.addcost_amount,
|
|
}
|
|
return JsonResponse(data)
|
|
|
|
|
|
@teamkfet_required
|
|
def account_read_json(request, trigramme):
|
|
account = get_object_or_404(Account, trigramme=trigramme)
|
|
if not account.readable:
|
|
raise Http404
|
|
data = {
|
|
"id": account.pk,
|
|
"name": account.name,
|
|
"email": account.email,
|
|
"is_cof": account.is_cof,
|
|
"promo": account.promo,
|
|
"balance": account.balance,
|
|
"is_frozen": account.is_frozen,
|
|
"departement": account.departement,
|
|
"nickname": account.nickname,
|
|
"trigramme": account.trigramme,
|
|
}
|
|
return JsonResponse(data)
|
|
|
|
|
|
@teamkfet_required
|
|
def kpsul_checkout_data(request):
|
|
pk = request.POST.get("pk", 0)
|
|
if not pk:
|
|
pk = 0
|
|
|
|
data = (
|
|
Checkout.objects.annotate(
|
|
last_statement_by_first_name=F(
|
|
"statements__by__cofprofile__user__first_name"
|
|
),
|
|
last_statement_by_last_name=F(
|
|
"statements__by__cofprofile__user__last_name"
|
|
),
|
|
last_statement_by_trigramme=F("statements__by__trigramme"),
|
|
last_statement_balance=F("statements__balance_new"),
|
|
last_statement_at=F("statements__at"),
|
|
)
|
|
.select_related(
|
|
"statements" "statements__by", "statements__by__cofprofile__user"
|
|
)
|
|
.filter(pk=pk)
|
|
.order_by("statements__at")
|
|
.values(
|
|
"id",
|
|
"name",
|
|
"balance",
|
|
"valid_from",
|
|
"valid_to",
|
|
"last_statement_balance",
|
|
"last_statement_at",
|
|
"last_statement_by_trigramme",
|
|
"last_statement_by_last_name",
|
|
"last_statement_by_first_name",
|
|
)
|
|
.last()
|
|
)
|
|
if data is None:
|
|
raise Http404
|
|
return JsonResponse(data)
|
|
|
|
|
|
@teamkfet_required
|
|
@kfet_password_auth
|
|
def kpsul_update_addcost(request):
|
|
addcost_form = AddcostForm(request.POST)
|
|
data = {"errors": []}
|
|
|
|
if not addcost_form.is_valid():
|
|
for field, errors in addcost_form.errors.items():
|
|
for error in errors:
|
|
data["errors"].append({"code": f"invalid_{field}", "message": error})
|
|
|
|
return JsonResponse(data, status=400)
|
|
|
|
required_perms = ["kfet.manage_addcosts"]
|
|
if not request.user.has_perms(required_perms):
|
|
data["missing_perms"] = get_missing_perms(required_perms, request.user)
|
|
return JsonResponse(data, status=403)
|
|
|
|
trigramme = addcost_form.cleaned_data["trigramme"]
|
|
account = trigramme and Account.objects.get(trigramme=trigramme) or None
|
|
amount = addcost_form.cleaned_data["amount"]
|
|
|
|
kfet_config.set(addcost_for=account, addcost_amount=amount)
|
|
|
|
data = {
|
|
"addcost": {"for": account and account.trigramme or None, "amount": amount},
|
|
"type": "kpsul",
|
|
}
|
|
|
|
channel_layer = get_channel_layer()
|
|
|
|
async_to_sync(channel_layer.group_send)("kfet.kpsul", data)
|
|
return JsonResponse(data)
|
|
|
|
|
|
def get_missing_perms(required_perms: List[str], user: User) -> List[str]:
|
|
def get_perm_name(app_label: str, codename: str) -> str:
|
|
return Permission.objects.values_list("name", flat=True).get(
|
|
codename=codename, content_type__app_label=app_label
|
|
)
|
|
|
|
missing_perms = [
|
|
get_perm_name(*perm.split("."))
|
|
for perm in required_perms
|
|
if not user.has_perm(perm)
|
|
]
|
|
|
|
return missing_perms
|
|
|
|
|
|
@teamkfet_required
|
|
@kfet_password_auth
|
|
def kpsul_perform_operations(request):
|
|
# Initializing response data
|
|
data = {"errors": []}
|
|
|
|
# Checking operationgroup
|
|
operationgroup_form = KPsulOperationGroupForm(request.POST)
|
|
if not operationgroup_form.is_valid():
|
|
for field in operationgroup_form.errors:
|
|
verbose_field, feminin = (
|
|
("compte", "") if field == "on_acc" else ("caisse", "e")
|
|
)
|
|
data["errors"].append(
|
|
{
|
|
"code": f"invalid_{field}",
|
|
"message": f"Pas de {verbose_field} sélectionné{feminin}",
|
|
}
|
|
)
|
|
|
|
# Checking operation_formset
|
|
operation_formset = KPsulOperationFormSet(request.POST)
|
|
if not operation_formset.is_valid():
|
|
data["errors"].append(
|
|
{
|
|
"code": "invalid_formset",
|
|
"message": "Formulaire d'opérations vide ou invalide",
|
|
}
|
|
)
|
|
|
|
# Returning BAD REQUEST if errors
|
|
if data["errors"]:
|
|
return JsonResponse(data, status=400)
|
|
|
|
# Pre-saving (no commit)
|
|
operationgroup = operationgroup_form.save(commit=False)
|
|
operations = operation_formset.save(commit=False)
|
|
on_acc = operationgroup.on_acc
|
|
|
|
# Retrieving COF grant
|
|
cof_grant = kfet_config.subvention_cof
|
|
# Retrieving addcosts data
|
|
addcost_amount = kfet_config.addcost_amount
|
|
addcost_for = kfet_config.addcost_for
|
|
|
|
# Initializing vars
|
|
required_perms = set() # Required perms to perform all operations
|
|
cof_grant_divisor = 1 + cof_grant / 100
|
|
to_addcost_for_balance = 0 # For balance of addcost_for
|
|
to_checkout_balance = 0 # For balance of selected checkout
|
|
to_articles_stocks = defaultdict(lambda: 0) # For stocks articles
|
|
is_addcost = all((addcost_for, addcost_amount, addcost_for != on_acc))
|
|
need_comment = on_acc.need_comment
|
|
|
|
if on_acc.is_frozen:
|
|
data["errors"].append(
|
|
{"code": "frozen_acc", "message": f"Le compte {on_acc.trigramme} est gelé"}
|
|
)
|
|
|
|
# Filling data of each operations
|
|
# + operationgroup + calculating other stuffs
|
|
for operation in operations:
|
|
if operation.type == Operation.PURCHASE:
|
|
operation.amount = -operation.article.price * operation.article_nb
|
|
if is_addcost & operation.article.category.has_addcost:
|
|
operation.addcost_for = addcost_for
|
|
operation.addcost_amount = addcost_amount * operation.article_nb
|
|
operation.amount -= operation.addcost_amount
|
|
to_addcost_for_balance += operation.addcost_amount
|
|
if on_acc.is_cash:
|
|
to_checkout_balance += -operation.amount
|
|
if on_acc.is_cof and operation.article.category.has_reduction:
|
|
if is_addcost and operation.article.category.has_addcost:
|
|
operation.addcost_amount /= cof_grant_divisor
|
|
operation.amount = operation.amount / cof_grant_divisor
|
|
to_articles_stocks[operation.article] -= operation.article_nb
|
|
else:
|
|
if on_acc.is_cash:
|
|
data["errors"].append(
|
|
{
|
|
"code": "invalid_liq",
|
|
"message": (
|
|
"Impossible de compter autre chose que des achats sur LIQ"
|
|
),
|
|
}
|
|
)
|
|
if operation.type != Operation.EDIT:
|
|
to_checkout_balance += operation.amount
|
|
operationgroup.amount += operation.amount
|
|
if operation.type == Operation.DEPOSIT:
|
|
required_perms.add("kfet.perform_deposit")
|
|
if operation.type == Operation.EDIT:
|
|
required_perms.add("kfet.edit_balance_account")
|
|
need_comment = True
|
|
if on_acc.is_cof:
|
|
to_addcost_for_balance = to_addcost_for_balance / cof_grant_divisor
|
|
|
|
(perms, stop) = on_acc.perms_to_perform_operation(amount=operationgroup.amount)
|
|
required_perms |= perms
|
|
|
|
if stop:
|
|
data["errors"].append(
|
|
{
|
|
"code": "negative",
|
|
"message": f"Le compte {on_acc.trigramme} a un solde insuffisant.",
|
|
}
|
|
)
|
|
|
|
if need_comment:
|
|
operationgroup.comment = operationgroup.comment.strip()
|
|
if not operationgroup.comment:
|
|
data["need_comment"] = True
|
|
|
|
if data["errors"] or "need_comment" in data:
|
|
return JsonResponse(data, status=400)
|
|
|
|
if not request.user.has_perms(required_perms):
|
|
data["missing_perms"] = get_missing_perms(required_perms, request.user)
|
|
return JsonResponse(data, status=403)
|
|
|
|
# If 1 perm is required, filling who perform the operations
|
|
if required_perms:
|
|
operationgroup.valid_by = request.user.profile.account_kfet
|
|
# Filling cof status for statistics
|
|
operationgroup.is_cof = on_acc.is_cof
|
|
|
|
# Starting transaction to ensure data consistency
|
|
with transaction.atomic():
|
|
# If not cash account,
|
|
# saving account's balance and adding to Negative if not in
|
|
if not on_acc.is_cash:
|
|
(
|
|
Account.objects.filter(pk=on_acc.pk).update(
|
|
balance=F("balance") + operationgroup.amount
|
|
)
|
|
)
|
|
on_acc.refresh_from_db()
|
|
on_acc.update_negative()
|
|
|
|
# Updating checkout's balance
|
|
if to_checkout_balance:
|
|
Checkout.objects.filter(pk=operationgroup.checkout.pk).update(
|
|
balance=F("balance") + to_checkout_balance
|
|
)
|
|
|
|
# Saving addcost_for with new balance if there is one
|
|
if is_addcost and to_addcost_for_balance:
|
|
Account.objects.filter(pk=addcost_for.pk).update(
|
|
balance=F("balance") + to_addcost_for_balance
|
|
)
|
|
|
|
# Saving operation group
|
|
operationgroup.save()
|
|
# Filling operationgroup id for each operations and saving
|
|
for operation in operations:
|
|
operation.group = operationgroup
|
|
operation.save()
|
|
|
|
# Updating articles stock
|
|
for article in to_articles_stocks:
|
|
Article.objects.filter(pk=article.pk).update(
|
|
stock=F("stock") + to_articles_stocks[article]
|
|
)
|
|
|
|
# Websocket data
|
|
websocket_data = {"type": "kpsul"}
|
|
websocket_data["groups"] = [
|
|
{
|
|
"add": True,
|
|
"type": "operation",
|
|
"id": operationgroup.pk,
|
|
"amount": operationgroup.amount,
|
|
"checkout__name": operationgroup.checkout.name,
|
|
"at": operationgroup.at,
|
|
"is_cof": operationgroup.is_cof,
|
|
"comment": operationgroup.comment,
|
|
"valid_by__trigramme": (
|
|
operationgroup.valid_by and operationgroup.valid_by.trigramme or None
|
|
),
|
|
"on_acc__trigramme": on_acc.trigramme,
|
|
"entries": [],
|
|
}
|
|
]
|
|
for operation in operations:
|
|
ope_data = {
|
|
"id": operation.pk,
|
|
"type": operation.type,
|
|
"amount": operation.amount,
|
|
"addcost_amount": operation.addcost_amount,
|
|
"addcost_for__trigramme": (
|
|
operation.addcost_for and addcost_for.trigramme or None
|
|
),
|
|
"article__name": (operation.article and operation.article.name or None),
|
|
"article_nb": operation.article_nb,
|
|
"group_id": operationgroup.pk,
|
|
"canceled_by__trigramme": None,
|
|
"canceled_at": None,
|
|
}
|
|
websocket_data["groups"][0]["entries"].append(ope_data)
|
|
# Need refresh from db cause we used update on queryset
|
|
operationgroup.checkout.refresh_from_db()
|
|
websocket_data["checkouts"] = [
|
|
{"id": operationgroup.checkout.pk, "balance": operationgroup.checkout.balance}
|
|
]
|
|
websocket_data["articles"] = []
|
|
# Need refresh from db cause we used update on querysets
|
|
articles_pk = [article.pk for article in to_articles_stocks]
|
|
articles = Article.objects.values("id", "stock").filter(pk__in=articles_pk)
|
|
for article in articles:
|
|
websocket_data["articles"].append(
|
|
{"id": article["id"], "stock": article["stock"]}
|
|
)
|
|
|
|
channel_layer = get_channel_layer()
|
|
|
|
async_to_sync(channel_layer.group_send)("kfet.kpsul", websocket_data)
|
|
return JsonResponse(data)
|
|
|
|
|
|
@teamkfet_required
|
|
@kfet_password_auth
|
|
def cancel_operations(request):
|
|
# Pour la réponse
|
|
data = {"canceled": [], "warnings": {}, "errors": []}
|
|
|
|
# Checking if BAD REQUEST (opes_pk not int or not existing)
|
|
try:
|
|
# Set pour virer les doublons
|
|
opes_post = set(
|
|
map(int, filter(None, request.POST.getlist("operations[]", [])))
|
|
)
|
|
except ValueError:
|
|
data["errors"].append(
|
|
{"code": "invalid_request", "message": "Requête invalide !"}
|
|
)
|
|
return JsonResponse(data, status=400)
|
|
|
|
opes_all = Operation.objects.select_related(
|
|
"group", "group__on_acc", "group__on_acc__negative"
|
|
).filter(pk__in=opes_post)
|
|
opes_pk = [ope.pk for ope in opes_all]
|
|
opes_notexisting = [ope for ope in opes_post if ope not in opes_pk]
|
|
if opes_notexisting:
|
|
data["errors"].append(
|
|
{
|
|
"code": "cancel_missing",
|
|
"message": "Opérations inexistantes : {}".format(
|
|
", ".join(map(str, opes_notexisting))
|
|
),
|
|
}
|
|
)
|
|
return JsonResponse(data, status=400)
|
|
|
|
opes_already_canceled = [] # Déjà annulée
|
|
opes = [] # Pas déjà annulée
|
|
required_perms = set()
|
|
cancel_duration = kfet_config.cancel_duration
|
|
|
|
# Modifs à faire sur les balances des comptes
|
|
to_accounts_balances = defaultdict(int)
|
|
# ------ sur les montants des groupes d'opé
|
|
to_groups_amounts = defaultdict(int)
|
|
# ------ sur les balances de caisses
|
|
to_checkouts_balances = defaultdict(int)
|
|
# ------ sur les stocks d'articles
|
|
to_articles_stocks = defaultdict(int)
|
|
|
|
for ope in opes_all:
|
|
if ope.canceled_at:
|
|
# Opération déjà annulée, va pour un warning en Response
|
|
opes_already_canceled.append(ope.pk)
|
|
else:
|
|
opes.append(ope.pk)
|
|
# Si opé il y a plus de CANCEL_DURATION, permission requise
|
|
if ope.group.at + cancel_duration < timezone.now():
|
|
required_perms.add("kfet.cancel_old_operations")
|
|
|
|
# Calcul de toutes modifs à faire en cas de validation
|
|
|
|
# Pour les balances de comptes
|
|
if not ope.group.on_acc.is_cash:
|
|
to_accounts_balances[ope.group.on_acc] -= ope.amount
|
|
if ope.addcost_for and ope.addcost_amount:
|
|
to_accounts_balances[ope.addcost_for] -= ope.addcost_amount
|
|
# Pour les groupes d'opés
|
|
to_groups_amounts[ope.group] -= ope.amount
|
|
|
|
# Pour les balances de caisses
|
|
# Les balances de caisses dont il y a eu un relevé depuis la date
|
|
# de la commande ne doivent pas être modifiées
|
|
# TODO : Prendre en compte le dernier relevé où la caisse a été
|
|
# comptée et donc modifier les balance_old (et amount_error)
|
|
# des relevés suivants.
|
|
# Note : Dans le cas où un CheckoutStatement est mis à jour
|
|
# par `.save()`, amount_error est recalculé automatiquement,
|
|
# ce qui n'est pas le cas en faisant un update sur queryset
|
|
# TODO ? : Maj les balance_old de relevés pour modifier l'erreur
|
|
last_statement = (
|
|
CheckoutStatement.objects.filter(checkout=ope.group.checkout)
|
|
.order_by("at")
|
|
.last()
|
|
)
|
|
if not last_statement or last_statement.at < ope.group.at:
|
|
if ope.is_checkout:
|
|
if ope.group.on_acc.is_cash:
|
|
to_checkouts_balances[ope.group.checkout] -= -ope.amount
|
|
else:
|
|
to_checkouts_balances[ope.group.checkout] -= ope.amount
|
|
|
|
# Pour les stocks d'articles
|
|
# Les stocks d'articles dont il y a eu un inventaire depuis la date
|
|
# de la commande ne doivent pas être modifiés
|
|
# TODO : Prendre en compte le dernier inventaire où le stock a bien
|
|
# été compté (pas dans le cas d'une livraison).
|
|
# Note : si InventoryArticle est maj par .save(), stock_error
|
|
# est recalculé automatiquement
|
|
if ope.article and ope.article_nb:
|
|
last_stock = (
|
|
InventoryArticle.objects.select_related("inventory")
|
|
.filter(article=ope.article)
|
|
.order_by("inventory__at")
|
|
.last()
|
|
)
|
|
if not last_stock or last_stock.inventory.at < ope.group.at:
|
|
to_articles_stocks[ope.article] += ope.article_nb
|
|
|
|
if not opes:
|
|
data["warnings"]["already_canceled"] = opes_already_canceled
|
|
return JsonResponse(data)
|
|
|
|
negative_accounts = []
|
|
# Checking permissions or stop
|
|
for account in to_accounts_balances:
|
|
(perms, stop) = account.perms_to_perform_operation(
|
|
amount=to_accounts_balances[account]
|
|
)
|
|
required_perms |= perms
|
|
if stop:
|
|
negative_accounts.append(account.trigramme)
|
|
|
|
if negative_accounts:
|
|
data["errors"].append(
|
|
{
|
|
"code": "negative",
|
|
"message": "Solde insuffisant pour les comptes suivants : {}".format(
|
|
", ".join(negative_accounts)
|
|
),
|
|
}
|
|
)
|
|
return JsonResponse(data, status=400)
|
|
|
|
if not request.user.has_perms(required_perms):
|
|
data["missing_perms"] = get_missing_perms(required_perms, request.user)
|
|
return JsonResponse(data, status=403)
|
|
|
|
canceled_by = required_perms and request.user.profile.account_kfet or None
|
|
canceled_at = timezone.now()
|
|
|
|
with transaction.atomic():
|
|
(
|
|
Operation.objects.filter(pk__in=opes).update(
|
|
canceled_by=canceled_by, canceled_at=canceled_at
|
|
)
|
|
)
|
|
for account in to_accounts_balances:
|
|
(
|
|
Account.objects.filter(pk=account.pk).update(
|
|
balance=F("balance") + to_accounts_balances[account]
|
|
)
|
|
)
|
|
if not account.is_cash:
|
|
# Should always be true, but we want to be sure
|
|
account.refresh_from_db()
|
|
account.update_negative()
|
|
for checkout in to_checkouts_balances:
|
|
Checkout.objects.filter(pk=checkout.pk).update(
|
|
balance=F("balance") + to_checkouts_balances[checkout]
|
|
)
|
|
for group in to_groups_amounts:
|
|
OperationGroup.objects.filter(pk=group.pk).update(
|
|
amount=F("amount") + to_groups_amounts[group]
|
|
)
|
|
for article in to_articles_stocks:
|
|
Article.objects.filter(pk=article.pk).update(
|
|
stock=F("stock") + to_articles_stocks[article]
|
|
)
|
|
|
|
# Need refresh from db cause we used update on querysets.
|
|
# Sort objects by pk to get deterministic responses.
|
|
opegroups_pk = [opegroup.pk for opegroup in to_groups_amounts]
|
|
opegroups = (
|
|
OperationGroup.objects.values("id", "amount", "is_cof")
|
|
.filter(pk__in=opegroups_pk)
|
|
.order_by("pk")
|
|
)
|
|
opes = (
|
|
Operation.objects.values("id", "canceled_at", "canceled_by__trigramme")
|
|
.filter(pk__in=opes)
|
|
.order_by("pk")
|
|
)
|
|
checkouts_pk = [checkout.pk for checkout in to_checkouts_balances]
|
|
checkouts = (
|
|
Checkout.objects.values("id", "balance")
|
|
.filter(pk__in=checkouts_pk)
|
|
.order_by("pk")
|
|
)
|
|
articles_pk = [article.pk for articles in to_articles_stocks]
|
|
articles = Article.objects.values("id", "stock").filter(pk__in=articles_pk)
|
|
|
|
# Websocket data
|
|
websocket_data = {"checkouts": [], "articles": [], "type": "kpsul"}
|
|
for checkout in checkouts:
|
|
websocket_data["checkouts"].append(
|
|
{"id": checkout["id"], "balance": checkout["balance"]}
|
|
)
|
|
for article in articles:
|
|
websocket_data["articles"].append(
|
|
{"id": article["id"], "stock": article["stock"]}
|
|
)
|
|
|
|
channel_layer = get_channel_layer()
|
|
|
|
async_to_sync(channel_layer.group_send)("kfet.kpsul", websocket_data)
|
|
|
|
data["canceled"] = list(opes)
|
|
data["opegroups_to_update"] = list(opegroups)
|
|
if opes_already_canceled:
|
|
data["warnings"]["already_canceled"] = opes_already_canceled
|
|
return JsonResponse(data)
|
|
|
|
|
|
def get_history_limit(user) -> Tuple[datetime, datetime]:
|
|
"""returns a tuple of 2 dates
|
|
- the earliest date the given user can view history of any account
|
|
- the earliest date the given user can view history of special accounts
|
|
(LIQ and #13)"""
|
|
now = timezone.now()
|
|
if user.has_perm("kfet.access_old_history"):
|
|
return (
|
|
now - settings.KFET_HISTORY_LONG_DATE_LIMIT,
|
|
settings.KFET_HISTORY_NO_DATE_LIMIT,
|
|
)
|
|
if user.has_perm("kfet.is_team"):
|
|
limit = now - settings.KFET_HISTORY_DATE_LIMIT
|
|
return limit, limit
|
|
# should not happen - future earliest date
|
|
future = now + timedelta(days=1)
|
|
return future, future
|
|
|
|
|
|
@login_required
|
|
def history_json(request):
|
|
# Récupération des paramètres
|
|
form = FilterHistoryForm(request.GET)
|
|
|
|
if not form.is_valid():
|
|
return HttpResponseBadRequest()
|
|
|
|
start = form.cleaned_data["start"]
|
|
end = form.cleaned_data["end"]
|
|
account = form.cleaned_data["account"]
|
|
checkout = form.cleaned_data["checkout"]
|
|
transfers_only = form.cleaned_data["transfers_only"]
|
|
opes_only = form.cleaned_data["opes_only"]
|
|
|
|
# Construction de la requête (sur les transferts) pour le prefetch
|
|
|
|
transfer_queryset_prefetch = Transfer.objects.select_related(
|
|
"from_acc", "to_acc", "canceled_by"
|
|
)
|
|
|
|
# Le check sur les comptes est dans le prefetch pour les transferts
|
|
if account:
|
|
transfer_queryset_prefetch = transfer_queryset_prefetch.filter(
|
|
Q(from_acc=account) | Q(to_acc=account)
|
|
)
|
|
|
|
if not request.user.has_perm("kfet.is_team"):
|
|
try:
|
|
acc = request.user.profile.account_kfet
|
|
transfer_queryset_prefetch = transfer_queryset_prefetch.filter(
|
|
Q(from_acc=acc) | Q(to_acc=acc)
|
|
)
|
|
except Account.DoesNotExist:
|
|
return JsonResponse({}, status=403)
|
|
|
|
transfer_prefetch = Prefetch(
|
|
"transfers", queryset=transfer_queryset_prefetch, to_attr="filtered_transfers"
|
|
)
|
|
|
|
# Construction de la requête (sur les opérations) pour le prefetch
|
|
ope_queryset_prefetch = Operation.objects.select_related(
|
|
"article", "canceled_by", "addcost_for"
|
|
)
|
|
ope_prefetch = Prefetch("opes", queryset=ope_queryset_prefetch)
|
|
|
|
# Construction de la requête principale
|
|
opegroups = (
|
|
OperationGroup.objects.prefetch_related(ope_prefetch)
|
|
.select_related("on_acc", "valid_by")
|
|
.order_by("at")
|
|
)
|
|
transfergroups = (
|
|
TransferGroup.objects.prefetch_related(transfer_prefetch)
|
|
.select_related("valid_by")
|
|
.order_by("at")
|
|
)
|
|
|
|
# limite l'accès à l'historique plus vieux que settings.KFET_HISTORY_DATE_LIMIT
|
|
limit_date = True
|
|
|
|
# Application des filtres
|
|
if start:
|
|
opegroups = opegroups.filter(at__gte=start)
|
|
transfergroups = transfergroups.filter(at__gte=start)
|
|
if end:
|
|
opegroups = opegroups.filter(at__lt=end)
|
|
transfergroups = transfergroups.filter(at__lt=end)
|
|
if checkout:
|
|
opegroups = opegroups.filter(checkout=checkout)
|
|
transfergroups = TransferGroup.objects.none()
|
|
if transfers_only:
|
|
opegroups = OperationGroup.objects.none()
|
|
if opes_only:
|
|
transfergroups = TransferGroup.objects.none()
|
|
if account:
|
|
opegroups = opegroups.filter(on_acc=account)
|
|
if account.user == request.user:
|
|
limit_date = False # pas de limite de date sur son propre historique
|
|
# Un non-membre de l'équipe n'a que accès à son historique
|
|
elif not request.user.has_perm("kfet.is_team"):
|
|
# un non membre de la kfet doit avoir le champ account
|
|
# pré-rempli, cette requête est douteuse
|
|
return JsonResponse({}, status=403)
|
|
if limit_date:
|
|
# limiter l'accès à l'historique ancien pour confidentialité
|
|
earliest_date, earliest_date_no_limit = get_history_limit(request.user)
|
|
if (
|
|
account
|
|
and account.trigramme in settings.KFET_HISTORY_NO_DATE_LIMIT_TRIGRAMMES
|
|
):
|
|
earliest_date = earliest_date_no_limit
|
|
opegroups = opegroups.filter(at__gte=earliest_date)
|
|
transfergroups = transfergroups.filter(at__gte=earliest_date)
|
|
|
|
# Construction de la réponse
|
|
history_groups = []
|
|
for opegroup in opegroups:
|
|
opegroup_dict = {
|
|
"type": "operation",
|
|
"id": opegroup.id,
|
|
"amount": opegroup.amount,
|
|
"at": opegroup.at,
|
|
"checkout_id": opegroup.checkout_id,
|
|
"is_cof": opegroup.is_cof,
|
|
"comment": opegroup.comment,
|
|
"entries": [],
|
|
"on_acc__trigramme": opegroup.on_acc and opegroup.on_acc.trigramme or None,
|
|
}
|
|
if request.user.has_perm("kfet.is_team"):
|
|
opegroup_dict["valid_by__trigramme"] = (
|
|
opegroup.valid_by and opegroup.valid_by.trigramme or None
|
|
)
|
|
for ope in opegroup.opes.all():
|
|
ope_dict = {
|
|
"id": ope.id,
|
|
"type": ope.type,
|
|
"amount": ope.amount,
|
|
"article_nb": ope.article_nb,
|
|
"addcost_amount": ope.addcost_amount,
|
|
"canceled_at": ope.canceled_at,
|
|
"article__name": ope.article and ope.article.name or None,
|
|
"addcost_for__trigramme": ope.addcost_for
|
|
and ope.addcost_for.trigramme
|
|
or None,
|
|
}
|
|
if request.user.has_perm("kfet.is_team"):
|
|
ope_dict["canceled_by__trigramme"] = (
|
|
ope.canceled_by and ope.canceled_by.trigramme or None
|
|
)
|
|
opegroup_dict["entries"].append(ope_dict)
|
|
history_groups.append(opegroup_dict)
|
|
for transfergroup in transfergroups:
|
|
if transfergroup.filtered_transfers:
|
|
transfergroup_dict = {
|
|
"type": "transfer",
|
|
"id": transfergroup.id,
|
|
"at": transfergroup.at,
|
|
"comment": transfergroup.comment,
|
|
"entries": [],
|
|
}
|
|
if request.user.has_perm("kfet.is_team"):
|
|
transfergroup_dict["valid_by__trigramme"] = (
|
|
transfergroup.valid_by and transfergroup.valid_by.trigramme or None
|
|
)
|
|
|
|
for transfer in transfergroup.filtered_transfers:
|
|
transfer_dict = {
|
|
"id": transfer.id,
|
|
"amount": transfer.amount,
|
|
"canceled_at": transfer.canceled_at,
|
|
"from_acc": transfer.from_acc.trigramme,
|
|
"to_acc": transfer.to_acc.trigramme,
|
|
}
|
|
if request.user.has_perm("kfet.is_team"):
|
|
transfer_dict["canceled_by__trigramme"] = (
|
|
transfer.canceled_by and transfer.canceled_by.trigramme or None
|
|
)
|
|
transfergroup_dict["entries"].append(transfer_dict)
|
|
history_groups.append(transfergroup_dict)
|
|
|
|
history_groups.sort(key=lambda group: group["at"])
|
|
|
|
return JsonResponse({"groups": history_groups})
|
|
|
|
|
|
@teamkfet_required
|
|
def kpsul_articles_data(request):
|
|
articles = Article.objects.values(
|
|
"id",
|
|
"name",
|
|
"price",
|
|
"stock",
|
|
"category_id",
|
|
"category__name",
|
|
"category__has_addcost",
|
|
"category__has_reduction",
|
|
).filter(is_sold=True)
|
|
return JsonResponse({"articles": list(articles)})
|
|
|
|
|
|
@teamkfet_required
|
|
def history(request):
|
|
# These limits are only useful for JS datepickers
|
|
# They don't enforce anything and can be bypassed
|
|
# Serious checks are done in history_json
|
|
history_limit, history_no_limit = get_history_limit(request.user)
|
|
history_no_limit_account_ids = Account.objects.filter(
|
|
trigramme__in=settings.KFET_HISTORY_NO_DATE_LIMIT_TRIGRAMMES
|
|
).values_list("id", flat=True)
|
|
format_date = lambda date: date.strftime("%Y-%m-%d %H:%M")
|
|
data = {
|
|
"filter_form": FilterHistoryForm(),
|
|
"history_limit": format_date(history_limit),
|
|
"history_no_limit_account_ids": history_no_limit_account_ids,
|
|
"history_no_limit": format_date(history_no_limit),
|
|
}
|
|
return render(request, "kfet/history.html", data)
|
|
|
|
|
|
# -----
|
|
# Settings views
|
|
# -----
|
|
|
|
|
|
class SettingsList(TemplateView):
|
|
template_name = "kfet/settings.html"
|
|
|
|
|
|
config_list = permission_required("kfet.see_config")(SettingsList.as_view())
|
|
|
|
|
|
@method_decorator(kfet_password_auth, name="dispatch")
|
|
class SettingsUpdate(SuccessMessageMixin, FormView):
|
|
form_class = KFetConfigForm
|
|
template_name = "kfet/settings_update.html"
|
|
success_message = "Paramètres mis à jour"
|
|
success_url = reverse_lazy("kfet.settings")
|
|
|
|
def form_valid(self, form):
|
|
# Checking permission
|
|
if not self.request.user.has_perm("kfet.change_config"):
|
|
form.add_error(
|
|
None, ValidationError("Permission refusée", code="permission-denied")
|
|
)
|
|
return self.form_invalid(form)
|
|
form.save()
|
|
return super().form_valid(form)
|
|
|
|
|
|
config_update = permission_required("kfet.change_config")(SettingsUpdate.as_view())
|
|
|
|
|
|
# -----
|
|
# Transfer views
|
|
# -----
|
|
|
|
|
|
@method_decorator(teamkfet_required, name="dispatch")
|
|
class TransferView(TemplateView):
|
|
template_name = "kfet/transfers.html"
|
|
|
|
|
|
@teamkfet_required
|
|
def transfers_create(request):
|
|
transfer_formset = TransferFormSet(queryset=Transfer.objects.none())
|
|
return render(
|
|
request, "kfet/transfers_create.html", {"transfer_formset": transfer_formset}
|
|
)
|
|
|
|
|
|
@teamkfet_required
|
|
@kfet_password_auth
|
|
def perform_transfers(request):
|
|
data = {"errors": []}
|
|
|
|
# Checking transfer_formset
|
|
transfer_formset = TransferFormSet(request.POST)
|
|
try:
|
|
if not transfer_formset.is_valid():
|
|
for form_errors in transfer_formset.errors:
|
|
for field, errors in form_errors.items():
|
|
if field == "amount":
|
|
for error in errors:
|
|
data["errors"].append({"code": "amount", "message": error})
|
|
else:
|
|
# C'est compliqué de trouver le compte qui pose problème...
|
|
acc_error = True
|
|
|
|
if acc_error:
|
|
data["errors"].append(
|
|
{
|
|
"code": "invalid_acc",
|
|
"message": "L'un des comptes est invalide ou manquant",
|
|
}
|
|
)
|
|
|
|
return JsonResponse(data, status=400)
|
|
|
|
except ValidationError:
|
|
data["errors"].append(
|
|
{"code": "invalid_request", "message": "Requête invalide"}
|
|
)
|
|
return JsonResponse(data, status=400)
|
|
|
|
transfers = transfer_formset.save(commit=False)
|
|
|
|
# Initializing vars
|
|
required_perms = set(
|
|
["kfet.add_transfer"]
|
|
) # Required perms to perform all transfers
|
|
to_accounts_balances = defaultdict(int) # For balances of accounts
|
|
|
|
for transfer in transfers:
|
|
to_accounts_balances[transfer.from_acc] -= transfer.amount
|
|
to_accounts_balances[transfer.to_acc] += transfer.amount
|
|
|
|
negative_accounts = []
|
|
# Checking if ok on all accounts
|
|
frozen = set()
|
|
for account in to_accounts_balances:
|
|
if account.is_frozen:
|
|
frozen.add(account.trigramme)
|
|
|
|
(perms, stop) = account.perms_to_perform_operation(
|
|
amount=to_accounts_balances[account]
|
|
)
|
|
required_perms |= perms
|
|
if stop:
|
|
negative_accounts.append(account.trigramme)
|
|
|
|
if frozen:
|
|
data["errors"].append(
|
|
{
|
|
"code": "frozen",
|
|
"message": "Les comptes suivants sont gelés : {}".format(
|
|
", ".join(frozen)
|
|
),
|
|
}
|
|
)
|
|
|
|
if negative_accounts:
|
|
data["errors"].append(
|
|
{
|
|
"code": "negative",
|
|
"message": "Solde insuffisant pour les comptes suivants : {}".format(
|
|
", ".join(negative_accounts)
|
|
),
|
|
}
|
|
)
|
|
|
|
if data["errors"]:
|
|
return JsonResponse(data, status=400)
|
|
|
|
if not request.user.has_perms(required_perms):
|
|
data["missing_perms"] = get_missing_perms(required_perms, request.user)
|
|
return JsonResponse(data, status=403)
|
|
|
|
# Creating transfer group
|
|
transfergroup = TransferGroup()
|
|
if required_perms:
|
|
transfergroup.valid_by = request.user.profile.account_kfet
|
|
|
|
comment = request.POST.get("comment", "")
|
|
transfergroup.comment = comment.strip()
|
|
|
|
with transaction.atomic():
|
|
# Updating balances accounts
|
|
for account in to_accounts_balances:
|
|
Account.objects.filter(pk=account.pk).update(
|
|
balance=F("balance") + to_accounts_balances[account]
|
|
)
|
|
account.refresh_from_db()
|
|
account.update_negative()
|
|
|
|
# Saving transfer group
|
|
transfergroup.save()
|
|
|
|
# Saving all transfers with group
|
|
for transfer in transfers:
|
|
transfer.group = transfergroup
|
|
transfer.save()
|
|
|
|
return JsonResponse({})
|
|
|
|
|
|
@teamkfet_required
|
|
@kfet_password_auth
|
|
def cancel_transfers(request):
|
|
# Pour la réponse
|
|
data = {"canceled": [], "warnings": {}, "errors": []}
|
|
|
|
# Checking if BAD REQUEST (transfers_pk not int or not existing)
|
|
try:
|
|
# Set pour virer les doublons
|
|
transfers_post = set(
|
|
map(int, filter(None, request.POST.getlist("transfers[]", [])))
|
|
)
|
|
except ValueError:
|
|
data["errors"].append(
|
|
{"code": "invalid_request", "message": "Requête invalide !"}
|
|
)
|
|
return JsonResponse(data, status=400)
|
|
|
|
transfers_all = Transfer.objects.select_related(
|
|
"group", "from_acc", "from_acc__negative", "to_acc", "to_acc__negative"
|
|
).filter(pk__in=transfers_post)
|
|
transfers_pk = [transfer.pk for transfer in transfers_all]
|
|
transfers_notexisting = [
|
|
transfer for transfer in transfers_post if transfer not in transfers_pk
|
|
]
|
|
if transfers_notexisting:
|
|
data["errors"].append(
|
|
{
|
|
"code": "cancel_missing",
|
|
"message": "Transferts inexistants : {}".format(
|
|
", ".join(map(str, transfers_notexisting))
|
|
),
|
|
}
|
|
)
|
|
return JsonResponse(data, status=400)
|
|
|
|
transfers_already_canceled = [] # Déjà annulés
|
|
transfers = [] # Pas déjà annulés
|
|
required_perms = set()
|
|
cancel_duration = kfet_config.cancel_duration
|
|
|
|
# Modifs à faire sur les balances des comptes
|
|
to_accounts_balances = defaultdict(int)
|
|
for transfer in transfers_all:
|
|
if transfer.canceled_at:
|
|
# Transfert déjà annulé, va pour un warning en Response
|
|
transfers_already_canceled.append(transfer.pk)
|
|
else:
|
|
transfers.append(transfer.pk)
|
|
# Si transfer il y a plus de CANCEL_DURATION, permission requise
|
|
if transfer.group.at + cancel_duration < timezone.now():
|
|
required_perms.add("kfet.cancel_old_operations")
|
|
|
|
# Calcul de toutes modifs à faire en cas de validation
|
|
|
|
# Pour les balances de comptes
|
|
to_accounts_balances[transfer.from_acc] += transfer.amount
|
|
to_accounts_balances[transfer.to_acc] += -transfer.amount
|
|
|
|
if not transfers:
|
|
data["warnings"]["already_canceled"] = transfers_already_canceled
|
|
return JsonResponse(data)
|
|
|
|
negative_accounts = []
|
|
# Checking permissions or stop
|
|
for account in to_accounts_balances:
|
|
(perms, stop) = account.perms_to_perform_operation(
|
|
amount=to_accounts_balances[account]
|
|
)
|
|
required_perms |= perms
|
|
if stop:
|
|
negative_accounts.append(account.trigramme)
|
|
|
|
if negative_accounts:
|
|
data["errors"].append(
|
|
{
|
|
"code": "negative",
|
|
"message": "Solde insuffisant pour les comptes suivants : {}".format(
|
|
", ".join(negative_accounts)
|
|
),
|
|
}
|
|
)
|
|
return JsonResponse(data, status=400)
|
|
|
|
if not request.user.has_perms(required_perms):
|
|
data["missing_perms"] = get_missing_perms(required_perms, request.user)
|
|
return JsonResponse(data, status=403)
|
|
|
|
canceled_by = required_perms and request.user.profile.account_kfet or None
|
|
canceled_at = timezone.now()
|
|
|
|
with transaction.atomic():
|
|
(
|
|
Transfer.objects.filter(pk__in=transfers).update(
|
|
canceled_by=canceled_by, canceled_at=canceled_at
|
|
)
|
|
)
|
|
|
|
for account in to_accounts_balances:
|
|
Account.objects.filter(pk=account.pk).update(
|
|
balance=F("balance") + to_accounts_balances[account]
|
|
)
|
|
account.refresh_from_db()
|
|
account.update_negative()
|
|
|
|
transfers = (
|
|
Transfer.objects.values("id", "canceled_at", "canceled_by__trigramme")
|
|
.filter(pk__in=transfers)
|
|
.order_by("pk")
|
|
)
|
|
data["canceled"] = list(transfers)
|
|
if transfers_already_canceled:
|
|
data["warnings"]["already_canceled"] = transfers_already_canceled
|
|
return JsonResponse(data)
|
|
|
|
|
|
class InventoryList(ListView):
|
|
queryset = (
|
|
Inventory.objects.select_related("by", "order")
|
|
.annotate(nb_articles=Count("articles"))
|
|
.order_by("-at")
|
|
)
|
|
template_name = "kfet/inventory.html"
|
|
context_object_name = "inventories"
|
|
|
|
|
|
@teamkfet_required
|
|
@kfet_password_auth
|
|
def inventory_create(request):
|
|
articles = Article.objects.select_related("category").order_by(
|
|
"-is_sold", "category__name", "name"
|
|
)
|
|
initial = []
|
|
for article in articles:
|
|
initial.append(
|
|
{
|
|
"is_sold": article.is_sold,
|
|
"article": article.pk,
|
|
"stock_old": article.stock,
|
|
"name": article.name,
|
|
"category": article.category_id,
|
|
"category__name": article.category.name,
|
|
"box_capacity": article.box_capacity or 0,
|
|
}
|
|
)
|
|
|
|
cls_formset = formset_factory(form=InventoryArticleForm, extra=0)
|
|
|
|
if request.POST:
|
|
formset = cls_formset(request.POST, initial=initial)
|
|
|
|
if not request.user.has_perm("kfet.add_inventory"):
|
|
messages.error(
|
|
request, "Permission refusée", extra_tags="permission-denied"
|
|
)
|
|
elif formset.is_valid():
|
|
with transaction.atomic():
|
|
articles = Article.objects.select_for_update()
|
|
inventory = Inventory()
|
|
inventory.by = request.user.profile.account_kfet
|
|
saved = False
|
|
for form in formset:
|
|
if form.cleaned_data["stock_new"] is not None:
|
|
if not saved:
|
|
inventory.save()
|
|
saved = True
|
|
|
|
article = articles.get(pk=form.cleaned_data["article"].pk)
|
|
stock_old = article.stock
|
|
stock_new = form.cleaned_data["stock_new"]
|
|
InventoryArticle.objects.create(
|
|
inventory=inventory,
|
|
article=article,
|
|
stock_old=stock_old,
|
|
stock_new=stock_new,
|
|
)
|
|
article.stock = stock_new
|
|
article.save()
|
|
if saved:
|
|
messages.success(request, "Inventaire créé")
|
|
return redirect("kfet.inventory")
|
|
messages.warning(request, "Bah alors ? On a rien compté ?")
|
|
else:
|
|
messages.error(request, "Pas marché")
|
|
else:
|
|
formset = cls_formset(initial=initial)
|
|
|
|
return render(request, "kfet/inventory_create.html", {"formset": formset})
|
|
|
|
|
|
class InventoryRead(DetailView):
|
|
model = Inventory
|
|
template_name = "kfet/inventory_read.html"
|
|
context_object_name = "inventory"
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
output_field = DecimalField(max_digits=10, decimal_places=2, default=0)
|
|
inventory_articles = (
|
|
InventoryArticle.objects.select_related("article", "article__category")
|
|
.filter(inventory=self.object)
|
|
.annotate(
|
|
amount_error=ExpressionWrapper(
|
|
F("stock_error") * F("article__price"), output_field=output_field
|
|
)
|
|
)
|
|
.order_by("article__category__name", "article__name")
|
|
)
|
|
context["inventoryarts"] = inventory_articles
|
|
stats = inventory_articles.aggregate(
|
|
new=ExpressionWrapper(
|
|
Sum(F("stock_new") * F("article__price")), output_field=output_field
|
|
),
|
|
error=Sum("amount_error"),
|
|
old=ExpressionWrapper(
|
|
Sum(F("stock_old") * F("article__price")), output_field=output_field
|
|
),
|
|
)
|
|
context.update(
|
|
{
|
|
"total_amount_old": stats["old"],
|
|
"total_amount_new": stats["new"],
|
|
"total_amount_error": stats["error"],
|
|
}
|
|
)
|
|
return context
|
|
|
|
|
|
class InventoryDelete(PermissionRequiredMixin, DeleteView):
|
|
model = Inventory
|
|
success_url = reverse_lazy("kfet.inventory")
|
|
success_message = "Inventaire annulé avec succès !"
|
|
permission_required = "kfet.delete_inventory"
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
return redirect("kfet.inventory.read", self.kwargs.get(self.pk_url_kwarg))
|
|
|
|
def delete(self, request, *args, **kwargs):
|
|
inv = self.get_object()
|
|
# On met à jour les articles dont c'est le dernier inventaire
|
|
# .get() ne marche pas avec OuterRef, donc on utilise .filter() avec [:1]
|
|
update_subquery = InventoryArticle.objects.filter(
|
|
inventory=inv, article=OuterRef("pk")
|
|
).values("stock_old")[:1]
|
|
|
|
Article.objects.annotate(last_env=Max("inventories__at")).filter(
|
|
last_env=inv.at
|
|
).update(stock=Subquery(update_subquery))
|
|
|
|
# On a tout mis à jour, on peut delete (avec un message)
|
|
messages.success(request, self.success_message)
|
|
return super().delete(request, *args, **kwargs)
|
|
|
|
|
|
# -----
|
|
# Order views
|
|
# -----
|
|
|
|
|
|
class OrderList(ListView):
|
|
queryset = Order.objects.select_related("supplier", "inventory")
|
|
template_name = "kfet/order.html"
|
|
context_object_name = "orders"
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
context["suppliers"] = Supplier.objects.order_by("name")
|
|
return context
|
|
|
|
|
|
@teamkfet_required
|
|
@kfet_password_auth
|
|
def order_create(request, pk):
|
|
supplier = get_object_or_404(Supplier, pk=pk)
|
|
|
|
articles = (
|
|
Article.objects.filter(suppliers=supplier.pk)
|
|
.distinct()
|
|
.select_related("category")
|
|
.order_by("-is_sold", "category__name", "name")
|
|
)
|
|
|
|
# Force hit to cache
|
|
articles = list(articles)
|
|
|
|
sales_q = (
|
|
Operation.objects.select_related("group")
|
|
.filter(article__in=articles, canceled_at=None)
|
|
.values("article")
|
|
.annotate(nb=Sum("article_nb"))
|
|
)
|
|
scale = WeekScale(last=True, n_steps=5, std_chunk=False)
|
|
chunks = scale.chunkify_qs(sales_q, field="group__at")
|
|
|
|
sales = [{d["article"]: d["nb"] for d in chunk} for chunk in chunks]
|
|
|
|
initial = []
|
|
|
|
for article in articles:
|
|
# Get sales for each 5 last weeks
|
|
v_all = [chunk.get(article.pk, 0) for chunk in sales]
|
|
# Take the 3 greatest (eg to avoid 2 weeks of vacations)
|
|
v_3max = heapq.nlargest(3, v_all)
|
|
# Get average and standard deviation
|
|
v_moy = statistics.mean(v_3max)
|
|
v_et = statistics.pstdev(v_3max, v_moy)
|
|
# Expected sales for next week
|
|
v_prev = v_moy + v_et
|
|
# We want to have 1.5 * the expected sales in stock
|
|
# (because sometimes some articles are not delivered)
|
|
c_rec_tot = max(v_prev * 1.5 - article.stock, 0)
|
|
# If ordered quantity is close enough to a level which can led to free
|
|
# boxes, we increase it to this level.
|
|
if article.box_capacity:
|
|
c_rec_temp = c_rec_tot / article.box_capacity
|
|
if c_rec_temp >= 10:
|
|
c_rec = round(c_rec_temp)
|
|
elif c_rec_temp > 5:
|
|
c_rec = 10
|
|
elif c_rec_temp > 2:
|
|
c_rec = 5
|
|
else:
|
|
c_rec = round(c_rec_temp)
|
|
initial.append(
|
|
{
|
|
"article": article.pk,
|
|
"name": article.name,
|
|
"category": article.category_id,
|
|
"category__name": article.category.name,
|
|
"stock": article.stock,
|
|
"box_capacity": article.box_capacity,
|
|
"v_all": v_all,
|
|
"v_moy": round(v_moy),
|
|
"v_et": round(v_et),
|
|
"v_prev": round(v_prev),
|
|
"c_rec": article.box_capacity and c_rec or round(c_rec_tot),
|
|
"is_sold": article.is_sold,
|
|
}
|
|
)
|
|
|
|
cls_formset = formset_factory(form=OrderArticleForm, extra=0)
|
|
|
|
if request.POST:
|
|
formset = cls_formset(request.POST, initial=initial)
|
|
|
|
if not request.user.has_perm("kfet.add_order"):
|
|
messages.error(
|
|
request, "Permission refusée", extra_tags="permission-denied"
|
|
)
|
|
elif formset.is_valid():
|
|
order = Order()
|
|
order.supplier = supplier
|
|
saved = False
|
|
for form in formset:
|
|
if form.cleaned_data["quantity_ordered"] is not None:
|
|
if not saved:
|
|
order.save()
|
|
saved = True
|
|
|
|
article = form.cleaned_data["article"]
|
|
q_ordered = form.cleaned_data["quantity_ordered"]
|
|
if article.box_capacity:
|
|
q_ordered *= article.box_capacity
|
|
OrderArticle.objects.create(
|
|
order=order, article=article, quantity_ordered=q_ordered
|
|
)
|
|
if saved:
|
|
messages.success(request, "Commande créée")
|
|
return redirect("kfet.order.read", order.pk)
|
|
messages.warning(request, "Rien commandé => Pas de commande")
|
|
else:
|
|
messages.error(request, "Corrigez les erreurs")
|
|
else:
|
|
formset = cls_formset(initial=initial)
|
|
|
|
scale.label_fmt = "S-{rev_i}"
|
|
|
|
return render(
|
|
request,
|
|
"kfet/order_create.html",
|
|
{"supplier": supplier, "formset": formset, "scale": scale},
|
|
)
|
|
|
|
|
|
class OrderRead(DetailView):
|
|
model = Order
|
|
template_name = "kfet/order_read.html"
|
|
context_object_name = "order"
|
|
|
|
def get_context_data(self, **kwargs):
|
|
context = super().get_context_data(**kwargs)
|
|
orderarticles = (
|
|
OrderArticle.objects.select_related("article", "article__category")
|
|
.filter(order=self.object)
|
|
.order_by("article__category__name", "article__name")
|
|
)
|
|
context["orderarts"] = orderarticles
|
|
mail = (
|
|
"Bonjour,\n\nNous voudrions pour le ##DATE## à la K-Fêt de " "l'ENS Ulm :"
|
|
)
|
|
category = 0
|
|
for orderarticle in orderarticles:
|
|
if category != orderarticle.article.category:
|
|
category = orderarticle.article.category
|
|
mail += "\n"
|
|
nb = orderarticle.quantity_ordered
|
|
box = ""
|
|
if orderarticle.article.box_capacity:
|
|
nb /= orderarticle.article.box_capacity
|
|
if nb >= 2:
|
|
box = " %ss de" % orderarticle.article.box_type
|
|
else:
|
|
box = " %s de" % orderarticle.article.box_type
|
|
name = orderarticle.article.name.capitalize()
|
|
mail += "\n- %s%s %s" % (round(nb), box, name)
|
|
|
|
mail += (
|
|
"\n\nMerci d'appeler le numéro suivant lorsque les livreurs "
|
|
"sont là : ##TELEPHONE##\nCordialement,\n##PRENOM## ##NOM## "
|
|
", pour la K-Fêt de l'ENS Ulm"
|
|
)
|
|
|
|
context["mail"] = mail
|
|
return context
|
|
|
|
|
|
@teamkfet_required
|
|
@kfet_password_auth
|
|
def order_to_inventory(request, pk):
|
|
order = get_object_or_404(Order, pk=pk)
|
|
|
|
if hasattr(order, "inventory"):
|
|
raise Http404
|
|
|
|
supplier_prefetch = Prefetch(
|
|
"article__supplierarticle_set",
|
|
queryset=(
|
|
SupplierArticle.objects.filter(supplier=order.supplier).order_by("-at")
|
|
),
|
|
to_attr="supplier",
|
|
)
|
|
|
|
order_articles = (
|
|
OrderArticle.objects.filter(order=order.pk)
|
|
.select_related("article", "article__category")
|
|
.prefetch_related(supplier_prefetch)
|
|
.order_by("article__category__name", "article__name")
|
|
)
|
|
|
|
initial = []
|
|
for order_article in order_articles:
|
|
article = order_article.article
|
|
initial.append(
|
|
{
|
|
"article": article.pk,
|
|
"name": article.name,
|
|
"category": article.category_id,
|
|
"category__name": article.category.name,
|
|
"quantity_ordered": order_article.quantity_ordered,
|
|
"quantity_received": order_article.quantity_ordered,
|
|
"price_HT": article.supplier[0].price_HT,
|
|
"TVA": article.supplier[0].TVA,
|
|
"rights": article.supplier[0].rights,
|
|
}
|
|
)
|
|
|
|
cls_formset = formset_factory(OrderArticleToInventoryForm, extra=0)
|
|
|
|
if request.method == "POST":
|
|
formset = cls_formset(request.POST, initial=initial)
|
|
|
|
if not request.user.has_perm("kfet.order_to_inventory"):
|
|
messages.error(
|
|
request, "Permission refusée", extra_tags="permission-denied"
|
|
)
|
|
elif formset.is_valid():
|
|
with transaction.atomic():
|
|
inventory = Inventory.objects.create(
|
|
order=order, by=request.user.profile.account_kfet
|
|
)
|
|
new_supplierarticle = []
|
|
new_inventoryarticle = []
|
|
for form in formset:
|
|
q_received = form.cleaned_data["quantity_received"]
|
|
article = form.cleaned_data["article"]
|
|
|
|
price_HT = form.cleaned_data["price_HT"]
|
|
TVA = form.cleaned_data["TVA"]
|
|
rights = form.cleaned_data["rights"]
|
|
|
|
if any(
|
|
(
|
|
form.initial["price_HT"] != price_HT,
|
|
form.initial["TVA"] != TVA,
|
|
form.initial["rights"] != rights,
|
|
)
|
|
):
|
|
new_supplierarticle.append(
|
|
SupplierArticle(
|
|
supplier=order.supplier,
|
|
article=article,
|
|
price_HT=price_HT,
|
|
TVA=TVA,
|
|
rights=rights,
|
|
)
|
|
)
|
|
(
|
|
OrderArticle.objects.filter(
|
|
order=order, article=article
|
|
).update(quantity_received=q_received)
|
|
)
|
|
new_inventoryarticle.append(
|
|
InventoryArticle(
|
|
inventory=inventory,
|
|
article=article,
|
|
stock_old=article.stock,
|
|
stock_new=article.stock + q_received,
|
|
)
|
|
)
|
|
article.stock += q_received
|
|
if q_received > 0:
|
|
article.is_sold = True
|
|
article.save()
|
|
SupplierArticle.objects.bulk_create(new_supplierarticle)
|
|
InventoryArticle.objects.bulk_create(new_inventoryarticle)
|
|
messages.success(request, "C'est tout bon !")
|
|
return redirect("kfet.order")
|
|
else:
|
|
messages.error(request, "Corrigez les erreurs")
|
|
else:
|
|
formset = cls_formset(initial=initial)
|
|
|
|
return render(
|
|
request, "kfet/order_to_inventory.html", {"formset": formset, "order": order}
|
|
)
|
|
|
|
|
|
@method_decorator(kfet_password_auth, name="dispatch")
|
|
class SupplierUpdate(SuccessMessageMixin, UpdateView):
|
|
model = Supplier
|
|
template_name = "kfet/supplier_form.html"
|
|
fields = ["name", "address", "email", "phone", "comment"]
|
|
success_url = reverse_lazy("kfet.order")
|
|
sucess_message = "Données fournisseur mis à jour"
|
|
|
|
# Surcharge de la validation
|
|
def form_valid(self, form):
|
|
# Checking permission
|
|
if not self.request.user.has_perm("kfet.change_supplier"):
|
|
form.add_error(
|
|
None, ValidationError("Permission refusée", code="permission-denied")
|
|
)
|
|
return self.form_invalid(form)
|
|
# Updating
|
|
return super().form_valid(form)
|
|
|
|
|
|
# ==========
|
|
# Statistics
|
|
# ==========
|
|
|
|
|
|
# ---------------
|
|
# Vues génériques
|
|
# ---------------
|
|
# source : docs.djangoproject.com/fr/1.10/topics/class-based-views/mixins/
|
|
class JSONResponseMixin:
|
|
"""
|
|
A mixin that can be used to render a JSON response.
|
|
"""
|
|
|
|
def render_to_json_response(self, context, **response_kwargs):
|
|
"""
|
|
Returns a JSON response, transforming 'context' to make the payload.
|
|
"""
|
|
return JsonResponse(self.get_data(context), **response_kwargs)
|
|
|
|
def get_data(self, context):
|
|
"""
|
|
Returns an object that will be serialized as JSON by json.dumps().
|
|
"""
|
|
# Note: This is *EXTREMELY* naive; in reality, you'll need
|
|
# to do much more complex handling to ensure that arbitrary
|
|
# objects -- such as Django model instances or querysets
|
|
# -- can be serialized as JSON.
|
|
return context
|
|
|
|
|
|
class JSONDetailView(JSONResponseMixin, BaseDetailView):
|
|
"""Returns a DetailView that renders a JSON."""
|
|
|
|
def render_to_response(self, context):
|
|
return self.render_to_json_response(context)
|
|
|
|
|
|
class SingleResumeStat(JSONDetailView):
|
|
"""
|
|
Génère l'interface de sélection pour les statistiques d'un compte/article.
|
|
L'interface est constituée d'une série de boutons, qui récupèrent et graphent
|
|
des statistiques du même type, sur le même objet mais avec des arguments différents.
|
|
|
|
Attributs :
|
|
- url_stat : URL où récupérer les statistiques
|
|
- stats : liste de dictionnaires avec les clés suivantes :
|
|
- label : texte du bouton
|
|
- url_params : paramètres GET à rajouter à `url_stat`
|
|
- default : si `True`, graphe à montrer par défaut
|
|
|
|
On peut aussi définir `stats` dynamiquement, via la fonction `get_stats`.
|
|
"""
|
|
|
|
url_stat = None
|
|
stats = []
|
|
|
|
def get_stats(self):
|
|
return self.stats
|
|
|
|
def get_context_data(self, **kwargs):
|
|
# On n'hérite pas
|
|
context = {}
|
|
stats = []
|
|
# On peut avoir récupéré self.object via pk ou slug
|
|
if self.pk_url_kwarg in self.kwargs:
|
|
url_pk = getattr(self.object, self.pk_url_kwarg)
|
|
else:
|
|
url_pk = getattr(self.object, self.slug_url_kwarg)
|
|
|
|
for stat_def in self.get_stats():
|
|
url_params_d = stat_def.get("url_params", {})
|
|
if len(url_params_d) > 0:
|
|
url_params = "?{}".format(urlencode(url_params_d))
|
|
else:
|
|
url_params = ""
|
|
stats.append(
|
|
{
|
|
"label": stat_def["label"],
|
|
"url": "{url}{params}".format(
|
|
url=reverse(self.url_stat, args=[url_pk]), params=url_params
|
|
),
|
|
"default": stat_def.get("default", False),
|
|
}
|
|
)
|
|
context["stats"] = stats
|
|
return context
|
|
|
|
|
|
class UserAccountMixin:
|
|
"""
|
|
Mixin qui vérifie que le compte traité par la vue est celui de l'utilisateur·ice
|
|
actuel·le. Dans le cas contraire, renvoie un Http404.
|
|
"""
|
|
|
|
def get_object(self, *args, **kwargs):
|
|
obj = super().get_object(*args, **kwargs)
|
|
if self.request.user != obj.user:
|
|
raise Http404
|
|
return obj
|
|
|
|
|
|
class ScaleMixin(object):
|
|
"""Mixin pour utiliser les outils de `kfet.statistic`."""
|
|
|
|
def get_context_data(self, *args, **kwargs):
|
|
# On n'hérite pas
|
|
form = StatScaleForm(self.request.GET, prefix="scale")
|
|
|
|
if not form.is_valid():
|
|
raise SuspiciousOperation(
|
|
"Invalid StatScaleForm. Did someone tamper with the GET parameters ?"
|
|
)
|
|
|
|
scale_name = form.cleaned_data.pop("name")
|
|
scale_cls = SCALE_DICT.get(scale_name)
|
|
|
|
self.scale = scale_cls(**form.cleaned_data)
|
|
|
|
return {"labels": self.scale.get_labels()}
|
|
|
|
|
|
# -----------------------
|
|
# Evolution Balance perso
|
|
# -----------------------
|
|
|
|
|
|
@method_decorator(login_required, name="dispatch")
|
|
class AccountStatBalanceList(UserAccountMixin, SingleResumeStat):
|
|
"""
|
|
Menu général pour l'historique de balance d'un compte
|
|
"""
|
|
|
|
model = Account
|
|
slug_url_kwarg = "trigramme"
|
|
slug_field = "trigramme"
|
|
url_stat = "kfet.account.stat.balance"
|
|
stats = [
|
|
{"label": "Tout le temps"},
|
|
{"label": "1 an", "url_params": {"last_days": 365}},
|
|
{"label": "6 mois", "url_params": {"last_days": 183}},
|
|
{"label": "3 mois", "url_params": {"last_days": 90}, "default": True},
|
|
{"label": "30 jours", "url_params": {"last_days": 30}},
|
|
]
|
|
|
|
|
|
@method_decorator(login_required, name="dispatch")
|
|
class AccountStatBalance(UserAccountMixin, JSONDetailView):
|
|
"""
|
|
Statistiques (JSON) d'historique de balance d'un compte.
|
|
Prend en compte les opérations et transferts sur la période donnée.
|
|
"""
|
|
|
|
model = Account
|
|
slug_url_kwarg = "trigramme"
|
|
slug_field = "trigramme"
|
|
|
|
def get_changes_list(self, last_days=None, begin_date=None, end_date=None):
|
|
account = self.object
|
|
|
|
# prepare filters
|
|
if last_days is not None:
|
|
end_date = timezone.now()
|
|
begin_date = end_date - timezone.timedelta(days=last_days)
|
|
|
|
# prepare querysets
|
|
# TODO: retirer les opgroup dont tous les op sont annulées
|
|
opegroups = OperationGroup.objects.filter(on_acc=account)
|
|
transfers = Transfer.objects.filter(canceled_at=None).select_related("group")
|
|
recv_transfers = transfers.filter(to_acc=account)
|
|
sent_transfers = transfers.filter(from_acc=account)
|
|
|
|
# apply filters
|
|
if begin_date is not None:
|
|
opegroups = opegroups.filter(at__gte=begin_date)
|
|
recv_transfers = recv_transfers.filter(group__at__gte=begin_date)
|
|
sent_transfers = sent_transfers.filter(group__at__gte=begin_date)
|
|
|
|
if end_date is not None:
|
|
opegroups = opegroups.filter(at__lte=end_date)
|
|
recv_transfers = recv_transfers.filter(group__at__lte=end_date)
|
|
sent_transfers = sent_transfers.filter(group__at__lte=end_date)
|
|
|
|
# On transforme tout ça en une liste de dictionnaires sous la forme
|
|
# {'at': date,
|
|
# 'amount': changement de la balance (négatif si diminue la balance,
|
|
# positif si l'augmente),
|
|
# 'label': text descriptif,
|
|
# 'balance': état de la balance après l'action (0 pour le moment,
|
|
# sera mis à jour lors d'une
|
|
# autre passe)
|
|
# }
|
|
|
|
actions = []
|
|
|
|
actions.append(
|
|
{
|
|
"at": (begin_date or account.created_at).isoformat(),
|
|
"amount": 0,
|
|
"balance": 0,
|
|
}
|
|
)
|
|
actions.append(
|
|
{"at": (end_date or timezone.now()).isoformat(), "amount": 0, "balance": 0}
|
|
)
|
|
|
|
actions += (
|
|
[
|
|
{"at": ope_grp.at.isoformat(), "amount": ope_grp.amount, "balance": 0}
|
|
for ope_grp in opegroups
|
|
]
|
|
+ [
|
|
{"at": tr.group.at.isoformat(), "amount": tr.amount, "balance": 0}
|
|
for tr in recv_transfers
|
|
]
|
|
+ [
|
|
{"at": tr.group.at.isoformat(), "amount": -tr.amount, "balance": 0}
|
|
for tr in sent_transfers
|
|
]
|
|
)
|
|
# Maintenant on trie la liste des actions par ordre du plus récent
|
|
# an plus ancien et on met à jour la balance
|
|
if len(actions) > 1:
|
|
actions = sorted(actions, key=lambda k: k["at"], reverse=True)
|
|
actions[0]["balance"] = account.balance
|
|
for i in range(len(actions) - 1):
|
|
actions[i + 1]["balance"] = (
|
|
actions[i]["balance"] - actions[i + 1]["amount"]
|
|
)
|
|
return actions
|
|
|
|
def get_context_data(self, *args, **kwargs):
|
|
context = {}
|
|
|
|
form = AccountStatForm(self.request.GET)
|
|
|
|
if not form.is_valid():
|
|
raise SuspiciousOperation(
|
|
"Invalid AccountStatForm. Did someone tamper with the GET parameters ?"
|
|
)
|
|
|
|
changes = self.get_changes_list(**form.cleaned_data)
|
|
|
|
context["charts"] = [
|
|
{"color": "rgb(200, 20, 60)", "label": "Balance", "values": changes}
|
|
]
|
|
context["is_time_chart"] = True
|
|
if len(changes) > 0:
|
|
context["min_date"] = changes[-1]["at"]
|
|
context["max_date"] = changes[0]["at"]
|
|
# TODO: offset
|
|
return context
|
|
|
|
|
|
# ------------------------
|
|
# Consommation personnelle
|
|
# ------------------------
|
|
|
|
|
|
@method_decorator(login_required, name="dispatch")
|
|
class AccountStatOperationList(UserAccountMixin, SingleResumeStat):
|
|
"""
|
|
Menu général pour l'historique de consommation d'un compte
|
|
"""
|
|
|
|
model = Account
|
|
slug_url_kwarg = "trigramme"
|
|
slug_field = "trigramme"
|
|
url_stat = "kfet.account.stat.operation"
|
|
|
|
def get_stats(self):
|
|
scales_def = [
|
|
(
|
|
"Tout le temps",
|
|
MonthScale,
|
|
{"last": True, "begin": self.object.created_at.replace(tzinfo=None)},
|
|
False,
|
|
),
|
|
("1 an", MonthScale, {"last": True, "n_steps": 12}, False),
|
|
("3 mois", WeekScale, {"last": True, "n_steps": 13}, True),
|
|
("2 semaines", DayScale, {"last": True, "n_steps": 14}, False),
|
|
]
|
|
|
|
return scale_url_params(scales_def)
|
|
|
|
|
|
@method_decorator(login_required, name="dispatch")
|
|
class AccountStatOperation(UserAccountMixin, ScaleMixin, JSONDetailView):
|
|
"""
|
|
Statistiques (JSON) de consommation (nb d'items achetés) d'un compte.
|
|
"""
|
|
|
|
model = Account
|
|
slug_url_kwarg = "trigramme"
|
|
slug_field = "trigramme"
|
|
|
|
def get_context_data(self, *args, **kwargs):
|
|
context = super().get_context_data(*args, **kwargs)
|
|
|
|
operations = (
|
|
Operation.objects.filter(
|
|
type=Operation.PURCHASE, group__on_acc=self.object, canceled_at=None
|
|
)
|
|
.values("article_nb", "group__at")
|
|
.order_by("group__at")
|
|
)
|
|
# On compte les opérations
|
|
nb_ventes = self.scale.chunkify_qs(
|
|
operations, field="group__at", aggregate=Sum("article_nb")
|
|
)
|
|
|
|
context["charts"] = [
|
|
{
|
|
"color": "rgb(200, 20, 60)",
|
|
"label": "NB items achetés",
|
|
"values": nb_ventes,
|
|
}
|
|
]
|
|
return context
|
|
|
|
|
|
# ------------------------
|
|
# Article Statistiques Last
|
|
# ------------------------
|
|
|
|
|
|
@method_decorator(teamkfet_required, name="dispatch")
|
|
class ArticleStatSalesList(SingleResumeStat):
|
|
"""
|
|
Menu pour les statistiques de vente d'un article.
|
|
"""
|
|
|
|
model = Article
|
|
nb_default = 2
|
|
url_stat = "kfet.article.stat.sales"
|
|
|
|
def get_stats(self):
|
|
first_conso = (
|
|
Operation.objects.filter(article=self.object)
|
|
.order_by("group__at")
|
|
.values_list("group__at", flat=True)
|
|
.first()
|
|
)
|
|
if first_conso is None:
|
|
# On le crée dans le passé au cas où
|
|
first_conso = timezone.now() - timedelta(seconds=1)
|
|
scales_def = [
|
|
(
|
|
"Tout le temps",
|
|
MonthScale,
|
|
{"last": True, "begin": first_conso.strftime("%Y-%m-%d %H:%M:%S")},
|
|
False,
|
|
),
|
|
("1 an", MonthScale, {"last": True, "n_steps": 12}, False),
|
|
("3 mois", WeekScale, {"last": True, "n_steps": 13}, True),
|
|
("2 semaines", DayScale, {"last": True, "n_steps": 14}, False),
|
|
]
|
|
|
|
return scale_url_params(scales_def)
|
|
|
|
|
|
@method_decorator(teamkfet_required, name="dispatch")
|
|
class ArticleStatSales(ScaleMixin, JSONDetailView):
|
|
"""
|
|
Statistiques (JSON) de vente d'un article.
|
|
Sépare LIQ et les comptes K-Fêt, et rajoute le total.
|
|
"""
|
|
|
|
model = Article
|
|
context_object_name = "article"
|
|
|
|
def get_context_data(self, *args, **kwargs):
|
|
context = super().get_context_data(*args, **kwargs)
|
|
scale = self.scale
|
|
|
|
all_purchases = (
|
|
Operation.objects.filter(
|
|
type=Operation.PURCHASE, article=self.object, canceled_at=None
|
|
)
|
|
.values("group__at", "article_nb")
|
|
.order_by("group__at")
|
|
)
|
|
cof_accts = all_purchases.filter(group__on_acc__cofprofile__is_cof=True)
|
|
noncof_accts = all_purchases.exclude(group__on_acc__cofprofile__is_cof=True)
|
|
|
|
nb_cof = scale.chunkify_qs(
|
|
cof_accts, field="group__at", aggregate=Sum("article_nb")
|
|
)
|
|
nb_noncof = scale.chunkify_qs(
|
|
noncof_accts, field="group__at", aggregate=Sum("article_nb")
|
|
)
|
|
nb_ventes = [n1 + n2 for n1, n2 in zip(nb_cof, nb_noncof)]
|
|
|
|
context["charts"] = [
|
|
{
|
|
"color": "rgb(200, 20, 60)",
|
|
"label": "Toutes consommations",
|
|
"values": nb_ventes,
|
|
},
|
|
{"color": "rgb(54, 162, 235)", "label": "Comptes K-Fêt", "values": nb_cof},
|
|
{
|
|
"color": "rgb(255, 205, 86)",
|
|
"label": "LIQ",
|
|
"values": nb_noncof,
|
|
},
|
|
]
|
|
return context
|
|
|
|
|
|
# ---
|
|
# Autocompletion views
|
|
# ---
|
|
|
|
|
|
class AccountCreateAutocompleteView(PermissionRequiredMixin, AutocompleteView):
|
|
template_name = "kfet/search_results.html"
|
|
permission_required = "kfet.is_team"
|
|
search_composer = kfet_autocomplete
|
|
|
|
|
|
class AccountSearchAutocompleteView(PermissionRequiredMixin, AutocompleteView):
|
|
permission_required = "kfet.is_team"
|
|
search_composer = kfet_account_only_autocomplete
|