gestioCOF/kfet/views.py
2017-05-16 17:08:51 +02:00

2615 lines
94 KiB
Python

# -*- coding: utf-8 -*-
import ast
from urllib.parse import urlencode
from django.shortcuts import render, get_object_or_404, redirect
from django.core.exceptions import PermissionDenied
from django.core.cache import cache
from django.views.generic import ListView, DetailView, TemplateView, FormView
from django.views.generic.detail import BaseDetailView
from django.views.generic.edit import CreateView, UpdateView
from django.core.urlresolvers import reverse, reverse_lazy
from django.contrib import messages
from django.contrib.messages.views import SuccessMessageMixin
from django.contrib.auth import authenticate, login
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.models import User, Permission, Group
from django.http import JsonResponse, Http404
from django.forms import formset_factory
from django.db import transaction
from django.db.models import Q, F, Sum, Prefetch, Count
from django.db.models.functions import Coalesce
from django.utils import timezone
from django.utils.crypto import get_random_string
from django.utils.decorators import method_decorator
from gestioncof.models import CofProfile
from kfet.config import kfet_config
from kfet.decorators import teamkfet_required
from kfet.models import (
Account, Checkout, Article, AccountNegative,
CheckoutStatement, GenericTeamToken, Supplier, SupplierArticle, Inventory,
InventoryArticle, Order, OrderArticle, Operation, OperationGroup,
TransferGroup, Transfer, ArticleCategory)
from kfet.forms import (
AccountTriForm, AccountBalanceForm, AccountNoTriForm, UserForm, CofForm,
UserRestrictTeamForm, UserGroupForm, AccountForm, CofRestrictForm,
AccountPwdForm, AccountNegativeForm, UserRestrictForm, AccountRestrictForm,
GroupForm, CheckoutForm, CheckoutRestrictForm, CheckoutStatementCreateForm,
CheckoutStatementUpdateForm, ArticleForm, ArticleRestrictForm,
KPsulOperationGroupForm, KPsulAccountForm, KPsulCheckoutForm,
KPsulOperationFormSet, AddcostForm, FilterHistoryForm,
TransferFormSet, InventoryArticleForm, OrderArticleForm,
OrderArticleToInventoryForm, CategoryForm, KFetConfigForm
)
from collections import defaultdict
from kfet import consumers
from datetime import timedelta
from decimal import Decimal
import django_cas_ng
import heapq
import statistics
from kfet.statistic import ScaleMixin, last_stats_manifest, tot_ventes, WeekScale
# source : docs.djangoproject.com/fr/1.10/topics/class-based-views/mixins/
class JSONResponseMixin(object):
"""
A mixin that can be used to render a JSON response.
"""
def render_to_json_response(self, context, **response_kwargs):
"""
Returns a JSON response, transforming 'context' to make the payload.
"""
return JsonResponse(
self.get_data(context),
**response_kwargs
)
def get_data(self, context):
"""
Returns an object that will be serialized as JSON by json.dumps().
"""
# Note: This is *EXTREMELY* naive; in reality, you'll need
# to do much more complex handling to ensure that arbitrary
# objects -- such as Django model instances or querysets
# -- can be serialized as JSON.
return context
class Home(TemplateView):
template_name = "kfet/home.html"
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
articles = list(
Article.objects
.filter(is_sold=True, hidden=False)
.select_related('category')
.order_by('category__name')
)
pressions, others = [], []
while len(articles) > 0:
article = articles.pop()
if article.category.name == 'Pression':
pressions.append(article)
else:
others.append(article)
context['pressions'], context['articles'] = pressions, others
return context
@method_decorator(login_required)
def dispatch(self, *args, **kwargs):
return super(TemplateView, self).dispatch(*args, **kwargs)
@teamkfet_required
def login_genericteam(request):
# Check si besoin de déconnecter l'utilisateur de CAS
profile, _ = CofProfile.objects.get_or_create(user=request.user)
need_cas_logout = False
if profile.login_clipper:
need_cas_logout = True
# Récupèration de la vue de déconnexion de CAS
# Ici, car request sera modifié après
logout_cas = django_cas_ng.views.logout(request)
# Authentification du compte générique
token = GenericTeamToken.objects.create(token=get_random_string(50))
user = authenticate(username="kfet_genericteam", token=token.token)
login(request, user)
if need_cas_logout:
# Vue de déconnexion de CAS
return logout_cas
return render(request, "kfet/login_genericteam.html")
def put_cleaned_data_in_dict(dict, form):
for field in form.cleaned_data:
dict[field] = form.cleaned_data[field]
# -----
# Account views
# -----
# Account - General
@login_required
@teamkfet_required
def account(request):
accounts = Account.objects.select_related('cofprofile__user').order_by('trigramme')
return render(request, "kfet/account.html", { 'accounts' : accounts })
@login_required
@teamkfet_required
def account_is_validandfree_ajax(request):
if not request.GET.get("trigramme", ''):
raise Http404
trigramme = request.GET.get("trigramme")
data = Account.is_validandfree(trigramme)
return JsonResponse(data)
# Account - Create
@login_required
@teamkfet_required
def account_create_special(request):
# Enregistrement
if request.method == "POST":
trigramme_form = AccountTriForm(request.POST, initial={'balance':0})
balance_form = AccountBalanceForm(request.POST)
# Peuplement des forms
username = request.POST.get('username')
login_clipper = request.POST.get('login_clipper')
forms = get_account_create_forms(
request, username=username, login_clipper=login_clipper)
account_form = forms['account_form']
cof_form = forms['cof_form']
user_form = forms['user_form']
if all((user_form.is_valid(), cof_form.is_valid(),
trigramme_form.is_valid(), account_form.is_valid(),
balance_form.is_valid())):
# Checking permission
if not request.user.has_perm('kfet.special_add_account'):
messages.error(request, 'Permission refusée')
else:
data = {}
# Fill data for Account.save()
put_cleaned_data_in_dict(data, user_form)
put_cleaned_data_in_dict(data, cof_form)
try:
account = trigramme_form.save(data = data)
account_form = AccountNoTriForm(request.POST, instance=account)
account_form.save()
balance_form = AccountBalanceForm(request.POST, instance=account)
balance_form.save()
amount = balance_form.cleaned_data['balance']
checkout = Checkout.objects.get(name='Initial')
is_cof = account.is_cof
opegroup = OperationGroup.objects.create(
on_acc=account,
checkout=checkout,
amount = amount,
is_cof = account.is_cof)
ope = Operation.objects.create(
group = opegroup,
type = Operation.INITIAL,
amount = amount)
messages.success(request, 'Compte créé : %s' % account.trigramme)
return redirect('kfet.account.create')
except Account.UserHasAccount as e:
messages.error(request, \
"Cet utilisateur a déjà un compte K-Fêt : %s" % e.trigramme)
else:
initial = { 'trigramme': request.GET.get('trigramme', '') }
trigramme_form = AccountTriForm(initial = initial)
balance_form = AccountBalanceForm(initial = {'balance': 0})
account_form = None
cof_form = None
user_form = None
return render(request, "kfet/account_create_special.html", {
'trigramme_form': trigramme_form,
'account_form': account_form,
'cof_form': cof_form,
'user_form': user_form,
'balance_form': balance_form,
})
# Account - Create
@login_required
@teamkfet_required
def account_create(request):
# Enregistrement
if request.method == "POST":
trigramme_form = AccountTriForm(request.POST)
# Peuplement des forms
username = request.POST.get('username')
login_clipper = request.POST.get('login_clipper')
forms = get_account_create_forms(
request, username=username, login_clipper=login_clipper)
account_form = forms['account_form']
cof_form = forms['cof_form']
user_form = forms['user_form']
if all((user_form.is_valid(), cof_form.is_valid(),
trigramme_form.is_valid(), account_form.is_valid())):
# Checking permission
if not request.user.has_perm('kfet.add_account'):
messages.error(request, 'Permission refusée')
else:
data = {}
# Fill data for Account.save()
put_cleaned_data_in_dict(data, user_form)
put_cleaned_data_in_dict(data, cof_form)
try:
account = trigramme_form.save(data = data)
account_form = AccountNoTriForm(request.POST, instance=account)
account_form.save()
messages.success(request, 'Compte créé : %s' % account.trigramme)
return redirect('kfet.account.create')
except Account.UserHasAccount as e:
messages.error(request, \
"Cet utilisateur a déjà un compte K-Fêt : %s" % e.trigramme)
else:
initial = { 'trigramme': request.GET.get('trigramme', '') }
trigramme_form = AccountTriForm(initial = initial)
account_form = None
cof_form = None
user_form = None
return render(request, "kfet/account_create.html", {
'trigramme_form': trigramme_form,
'account_form': account_form,
'cof_form': cof_form,
'user_form': user_form,
})
def account_form_set_readonly_fields(user_form, cof_form):
user_form.fields['username'].widget.attrs['readonly'] = True
cof_form.fields['login_clipper'].widget.attrs['readonly'] = True
cof_form.fields['is_cof'].widget.attrs['disabled'] = True
def get_account_create_forms(request=None, username=None, login_clipper=None,
fullname=None):
user = None
clipper = False
if login_clipper and (login_clipper == username or not username):
# à partir d'un clipper
# le user associé à ce clipper ne devrait pas encore exister
clipper = True
try:
# Vérification que clipper ne soit pas déjà dans User
user = User.objects.get(username=login_clipper)
# Ici, on nous a menti, le user existe déjà
username = user.username
clipper = False
except User.DoesNotExist:
# Clipper (sans user déjà existant)
# UserForm - Prefill
user_initial = {
'username' : login_clipper,
'email' : "%s@clipper.ens.fr" % login_clipper}
if fullname:
# Prefill du nom et prénom
names = fullname.split()
# Le premier, c'est le prénom
user_initial['first_name'] = names[0]
if len(names) > 1:
# Si d'autres noms -> tous dans le nom de famille
user_initial['last_name'] = " ".join(names[1:])
# CofForm - Prefill
cof_initial = { 'login_clipper': login_clipper }
# Form créations
if request:
user_form = UserForm(request.POST, initial=user_initial, from_clipper=True)
cof_form = CofForm(request.POST, initial=cof_initial)
else:
user_form = UserForm(initial=user_initial, from_clipper=True)
cof_form = CofForm(initial=cof_initial)
# Protection (read-only) des champs username et login_clipper
account_form_set_readonly_fields(user_form, cof_form)
if username and not clipper:
try:
user = User.objects.get(username=username)
# le user existe déjà
# récupération du profil cof
(cof, _) = CofProfile.objects.get_or_create(user=user)
# UserForm + CofForm - Création à partir des instances existantes
if request:
user_form = UserForm(request.POST, instance = user)
cof_form = CofForm(request.POST, instance = cof)
else:
user_form = UserForm(instance=user)
cof_form = CofForm(instance=cof)
# Protection (read-only) des champs username, login_clipper et is_cof
account_form_set_readonly_fields(user_form, cof_form)
except User.DoesNotExist:
# le username donnée n'existe pas -> Création depuis rien
# (éventuellement en cours avec erreurs précédemment)
pass
if not user and not clipper:
# connaît pas du tout, faut tout remplir
if request:
user_form = UserForm(request.POST)
cof_form = CofForm(request.POST)
else:
user_form = UserForm()
cof_form = CofForm()
# mais on laisse le username en écriture
cof_form.fields['login_clipper'].widget.attrs['readonly'] = True
cof_form.fields['is_cof'].widget.attrs['disabled'] = True
if request:
account_form = AccountNoTriForm(request.POST)
else:
account_form = AccountNoTriForm()
return {
'account_form': account_form,
'cof_form': cof_form,
'user_form': user_form,
}
@login_required
@teamkfet_required
def account_create_ajax(request, username=None, login_clipper=None,
fullname=None):
forms = get_account_create_forms(
request=None, username=username, login_clipper=login_clipper,
fullname=fullname)
return render(request, "kfet/account_create_form.html", {
'account_form' : forms['account_form'],
'cof_form' : forms['cof_form'],
'user_form' : forms['user_form'],
})
# Account - Read
@login_required
def account_read(request, trigramme):
account = get_object_or_404(Account, trigramme=trigramme)
# Checking permissions
if not request.user.has_perm('kfet.is_team') \
and request.user != account.user:
raise PermissionDenied
if request.GET.get('format') == 'json':
export_keys = ['id', 'trigramme', 'first_name', 'last_name', 'name',
'email', 'is_cof', 'promo', 'balance', 'is_frozen',
'departement', 'nickname']
data = {k: getattr(account, k) for k in export_keys}
return JsonResponse(data)
addcosts = (
OperationGroup.objects
.filter(opes__addcost_for=account,
opes__canceled_at=None)
.extra({'date': "date(at)"})
.values('date')
.annotate(sum_addcosts=Sum('opes__addcost_amount'))
.order_by('-date')
)
return render(request, "kfet/account_read.html", {
'account': account,
'addcosts': addcosts,
})
# Account - Update
@login_required
def account_update(request, trigramme):
account = get_object_or_404(Account, trigramme=trigramme)
# Checking permissions
if not request.user.has_perm('kfet.is_team') \
and request.user != account.user:
raise PermissionDenied
if request.user.has_perm('kfet.is_team'):
user_form = UserRestrictTeamForm(instance=account.user)
group_form = UserGroupForm(instance=account.user)
account_form = AccountForm(instance=account)
cof_form = CofRestrictForm(instance=account.cofprofile)
pwd_form = AccountPwdForm()
if account.balance < 0 and not hasattr(account, 'negative'):
AccountNegative.objects.create(account=account,
start=timezone.now())
account.refresh_from_db()
if hasattr(account, 'negative'):
negative_form = AccountNegativeForm(instance=account.negative)
else:
negative_form = None
else:
user_form = UserRestrictForm(instance=account.user)
account_form = AccountRestrictForm(instance=account)
cof_form = None
group_form = None
negative_form = None
pwd_form = None
if request.method == "POST":
# Update attempt
success = False
missing_perm = True
if request.user.has_perm('kfet.is_team'):
account_form = AccountForm(request.POST, instance=account)
cof_form = CofRestrictForm(request.POST,
instance=account.cofprofile)
user_form = UserRestrictTeamForm(request.POST,
instance=account.user)
group_form = UserGroupForm(request.POST, instance=account.user)
pwd_form = AccountPwdForm(request.POST)
if hasattr(account, 'negative'):
negative_form = AccountNegativeForm(request.POST,
instance=account.negative)
if (request.user.has_perm('kfet.change_account')
and account_form.is_valid() and cof_form.is_valid()
and user_form.is_valid()):
missing_perm = False
data = {}
# Fill data for Account.save()
put_cleaned_data_in_dict(data, user_form)
put_cleaned_data_in_dict(data, cof_form)
# Updating
account_form.save(data=data)
# Checking perm to update password
if (request.user.has_perm('kfet.change_account_password')
and pwd_form.is_valid()):
pwd = pwd_form.cleaned_data['pwd1']
account.change_pwd(pwd)
account.save()
messages.success(request, 'Mot de passe mis à jour')
# Checking perm to manage perms
if (request.user.has_perm('kfet.manage_perms')
and group_form.is_valid()):
group_form.save()
# Checking perm to manage negative
if hasattr(account, 'negative'):
balance_offset_old = 0
if account.negative.balance_offset:
balance_offset_old = account.negative.balance_offset
if (hasattr(account, 'negative')
and request.user.has_perm('kfet.change_accountnegative')
and negative_form.is_valid()):
balance_offset_new = \
negative_form.cleaned_data['balance_offset']
if not balance_offset_new:
balance_offset_new = 0
balance_offset_diff = (balance_offset_new
- balance_offset_old)
Account.objects.filter(pk=account.pk).update(
balance=F('balance') + balance_offset_diff)
negative_form.save()
if Account.objects.get(pk=account.pk).balance >= 0 \
and not balance_offset_new:
AccountNegative.objects.get(account=account).delete()
success = True
messages.success(
request,
'Informations du compte %s mises à jour'
% account.trigramme)
# Modification de ses propres informations
if request.user == account.user:
missing_perm = False
account.refresh_from_db()
user_form = UserRestrictForm(request.POST, instance=account.user)
account_form = AccountRestrictForm(request.POST, instance=account)
pwd_form = AccountPwdForm(request.POST)
if user_form.is_valid() and account_form.is_valid():
user_form.save()
account_form.save()
success = True
messages.success(request,
'Vos informations ont été mises à jour')
if request.user.has_perm('kfet.is_team') \
and pwd_form.is_valid():
pwd = pwd_form.cleaned_data['pwd1']
account.change_pwd(pwd)
account.save()
messages.success(
request, 'Votre mot de passe a été mis à jour')
if missing_perm:
messages.error(request, 'Permission refusée')
if success:
return redirect('kfet.account.read', account.trigramme)
else:
messages.error(
request, 'Informations non mises à jour. Corrigez les erreurs')
return render(request, "kfet/account_update.html", {
'account': account,
'account_form': account_form,
'cof_form': cof_form,
'user_form': user_form,
'group_form': group_form,
'negative_form': negative_form,
'pwd_form': pwd_form,
})
@permission_required('kfet.manage_perms')
def account_group(request):
user_pre = Prefetch(
'user_set',
queryset=User.objects.select_related('profile__account_kfet'),
)
groups = (
Group.objects
.filter(name__icontains='K-Fêt')
.prefetch_related('permissions', user_pre)
)
return render(request, 'kfet/account_group.html', {
'groups': groups,
})
class AccountGroupCreate(SuccessMessageMixin, CreateView):
model = Group
template_name = 'kfet/account_group_form.html'
form_class = GroupForm
success_message = 'Nouveau groupe : %(name)s'
success_url = reverse_lazy('kfet.account.group')
class AccountGroupUpdate(UpdateView):
queryset = Group.objects.filter(name__icontains='K-Fêt')
template_name = 'kfet/account_group_form.html'
form_class = GroupForm
success_message = 'Groupe modifié : %(name)s'
success_url = reverse_lazy('kfet.account.group')
class AccountNegativeList(ListView):
queryset = (
AccountNegative.objects
.select_related('account', 'account__cofprofile__user')
.exclude(account__trigramme='#13')
)
template_name = 'kfet/account_negative.html'
context_object_name = 'negatives'
def get_context_data(self, **kwargs):
context = super(AccountNegativeList, self).get_context_data(**kwargs)
real_balances = (neg.account.real_balance for neg in self.object_list)
context['negatives_sum'] = sum(real_balances)
return context
# -----
# Checkout views
# -----
# Checkout - General
class CheckoutList(ListView):
model = Checkout
template_name = 'kfet/checkout.html'
context_object_name = 'checkouts'
# Checkout - Create
class CheckoutCreate(SuccessMessageMixin, CreateView):
model = Checkout
template_name = 'kfet/checkout_create.html'
form_class = CheckoutForm
success_message = 'Nouvelle caisse : %(name)s'
# Surcharge de la validation
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm('kfet.add_checkout'):
form.add_error(None, 'Permission refusée')
return self.form_invalid(form)
# Creating
form.instance.created_by = self.request.user.profile.account_kfet
checkout = form.save()
# Création d'un relevé avec balance initiale
CheckoutStatement.objects.create(
checkout = checkout,
by = self.request.user.profile.account_kfet,
balance_old = checkout.balance,
balance_new = checkout.balance,
amount_taken = 0)
return super(CheckoutCreate, self).form_valid(form)
# Checkout - Read
class CheckoutRead(JSONResponseMixin, DetailView):
model = Checkout
template_name = 'kfet/checkout_read.html'
context_object_name = 'checkout'
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
checkout = self.object
if self.request.GET.get('last_statement'):
context['laststatement'] = checkout.statements.latest('at')
else:
context['statements'] = checkout.statements.order_by('-at')
return context
def render_to_response(self, context, **kwargs):
if self.request.GET.get('format') == 'json':
export_keys = ['id', 'name', 'balance', 'valid_from', 'valid_to']
data = {k: getattr(self.object, k) for k in export_keys}
if 'laststatement' in context:
last_st = context['laststatement']
export_keys = ['id', 'at', 'balance_new', 'balance_old']
last_st_data = {k: getattr(last_st, k) for k in export_keys}
last_st_data['by'] = str(last_st.by)
data['laststatement'] = last_st_data
return self.render_to_json_response(data)
else:
return super().render_to_response(context, **kwargs)
# Checkout - Update
class CheckoutUpdate(SuccessMessageMixin, UpdateView):
model = Checkout
template_name = 'kfet/checkout_update.html'
form_class = CheckoutRestrictForm
success_message = 'Informations mises à jour pour la caisse : %(name)s'
# Surcharge de la validation
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm('kfet.change_checkout'):
form.add_error(None, 'Permission refusée')
return self.form_invalid(form)
# Updating
return super(CheckoutUpdate, self).form_valid(form)
# -----
# Checkout Statement views
# -----
# Checkout Statement - General
class CheckoutStatementList(ListView):
model = CheckoutStatement
queryset = CheckoutStatement.objects.order_by('-at')
template_name = 'kfet/checkoutstatement.html'
context_object_name= 'checkoutstatements'
# Checkout Statement - Create
def getAmountTaken(data):
return Decimal(data.taken_001 * 0.01 + data.taken_002 * 0.02
+ data.taken_005 * 0.05 + data.taken_01 * 0.1
+ data.taken_02 * 0.2 + data.taken_05 * 0.5
+ data.taken_1 * 1 + data.taken_2 * 2
+ data.taken_5 * 5 + data.taken_10 * 10
+ data.taken_20 * 20 + data.taken_50 * 50
+ data.taken_100 * 100 + data.taken_200 * 200
+ data.taken_500 * 500 + float(data.taken_cheque))
def getAmountBalance(data):
return Decimal(data['balance_001'] * 0.01 + data['balance_002'] * 0.02
+ data['balance_005'] * 0.05 + data['balance_01'] * 0.1
+ data['balance_02'] * 0.2 + data['balance_05'] * 0.5
+ data['balance_1'] * 1 + data['balance_2'] * 2
+ data['balance_5'] * 5 + data['balance_10'] * 10
+ data['balance_20'] * 20 + data['balance_50'] * 50
+ data['balance_100'] * 100 + data['balance_200'] * 200
+ data['balance_500'] * 500)
class CheckoutStatementCreate(SuccessMessageMixin, CreateView):
model = CheckoutStatement
template_name = 'kfet/checkoutstatement_create.html'
form_class = CheckoutStatementCreateForm
success_message = 'Nouveau relevé : %(checkout)s - %(at)s'
def get_success_url(self):
return reverse_lazy('kfet.checkout.read', kwargs={'pk':self.kwargs['pk_checkout']})
def get_success_message(self, cleaned_data):
return self.success_message % dict(
cleaned_data,
checkout = self.object.checkout.name,
at = self.object.at)
def get_context_data(self, **kwargs):
context = super(CheckoutStatementCreate, self).get_context_data(**kwargs)
checkout = Checkout.objects.get(pk=self.kwargs['pk_checkout'])
context['checkout'] = checkout
return context
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm('kfet.add_checkoutstatement'):
form.add_error(None, 'Permission refusée')
return self.form_invalid(form)
# Creating
form.instance.amount_taken = getAmountTaken(form.instance)
if not form.instance.not_count:
form.instance.balance_new = getAmountBalance(form.cleaned_data)
form.instance.checkout_id = self.kwargs['pk_checkout']
form.instance.by = self.request.user.profile.account_kfet
return super(CheckoutStatementCreate, self).form_valid(form)
class CheckoutStatementUpdate(SuccessMessageMixin, UpdateView):
model = CheckoutStatement
template_name = 'kfet/checkoutstatement_update.html'
form_class = CheckoutStatementUpdateForm
success_message = 'Relevé modifié'
def get_success_url(self):
return reverse_lazy('kfet.checkout.read', kwargs={'pk':self.kwargs['pk_checkout']})
def get_context_data(self, **kwargs):
context = super(CheckoutStatementUpdate, self).get_context_data(**kwargs)
checkout = Checkout.objects.get(pk=self.kwargs['pk_checkout'])
context['checkout'] = checkout
return context
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm('kfet.change_checkoutstatement'):
form.add_error(None, 'Permission refusée')
return self.form_invalid(form)
# Updating
form.instance.amount_taken = getAmountTaken(form.instance)
return super(CheckoutStatementUpdate, self).form_valid(form)
# -----
# Category views
# -----
# Category - General
class CategoryList(ListView):
queryset = (ArticleCategory.objects
.prefetch_related('articles')
.order_by('name'))
template_name = 'kfet/category.html'
context_object_name = 'categories'
# Category - Update
class CategoryUpdate(SuccessMessageMixin, UpdateView):
model = ArticleCategory
template_name = 'kfet/category_update.html'
form_class = CategoryForm
success_url = reverse_lazy('kfet.category')
success_message = "Informations mises à jour pour la catégorie : %(name)s"
# Surcharge de la validation
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm('kfet.change_articlecategory'):
form.add_error(None, 'Permission refusée')
return self.form_invalid(form)
# Updating
return super(CategoryUpdate, self).form_valid(form)
# -----
# Article views
# -----
# Article - General
class ArticleList(ListView):
queryset = (
Article.objects
.select_related('category')
.prefetch_related(
Prefetch(
'inventories',
queryset=Inventory.objects.order_by('-at'),
to_attr='inventory',
)
)
.order_by('category__name', '-is_sold', 'name')
)
template_name = 'kfet/article.html'
context_object_name = 'articles'
# Article - Create
class ArticleCreate(SuccessMessageMixin, CreateView):
model = Article
template_name = 'kfet/article_create.html'
form_class = ArticleForm
success_message = 'Nouvel item : %(category)s - %(name)s'
# Surcharge de la validation
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm('kfet.add_article'):
form.add_error(None, 'Permission refusée')
return self.form_invalid(form)
# Save ici pour save le manytomany suppliers
article = form.save()
# Save des suppliers déjà existant
for supplier in form.cleaned_data['suppliers']:
SupplierArticle.objects.create(
article=article, supplier=supplier)
# Nouveau supplier
supplier_new = form.cleaned_data['supplier_new'].strip()
if supplier_new:
supplier, created = Supplier.objects.get_or_create(
name=supplier_new)
if created:
SupplierArticle.objects.create(
article=article, supplier=supplier)
# Inventaire avec stock initial
inventory = Inventory()
inventory.by = self.request.user.profile.account_kfet
inventory.save()
InventoryArticle.objects.create(
inventory=inventory,
article=article,
stock_old=article.stock,
stock_new=article.stock,
)
# Creating
return super(ArticleCreate, self).form_valid(form)
# Article - Read
class ArticleRead(DetailView):
model = Article
template_name = 'kfet/article_read.html'
context_object_name = 'article'
def get_context_data(self, **kwargs):
context = super(ArticleRead, self).get_context_data(**kwargs)
inventoryarts = (InventoryArticle.objects
.filter(article=self.object)
.select_related('inventory')
.order_by('-inventory__at'))
context['inventoryarts'] = inventoryarts
supplierarts = (SupplierArticle.objects
.filter(article=self.object)
.select_related('supplier')
.order_by('-at'))
context['supplierarts'] = supplierarts
return context
# Article - Update
class ArticleUpdate(SuccessMessageMixin, UpdateView):
model = Article
template_name = 'kfet/article_update.html'
form_class = ArticleRestrictForm
success_message = "Informations mises à jour pour l'article : %(name)s"
# Surcharge de la validation
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm('kfet.change_article'):
form.add_error(None, 'Permission refusée')
return self.form_invalid(form)
# Save ici pour save le manytomany suppliers
article = form.save()
# Save des suppliers déjà existant
for supplier in form.cleaned_data['suppliers']:
if supplier not in article.suppliers.all():
SupplierArticle.objects.create(
article=article, supplier=supplier)
# On vire les suppliers désélectionnés
for supplier in article.suppliers.all():
if supplier not in form.cleaned_data['suppliers']:
SupplierArticle.objects.filter(
article=article, supplier=supplier).delete()
# Nouveau supplier
supplier_new = form.cleaned_data['supplier_new'].strip()
if supplier_new:
supplier, created = Supplier.objects.get_or_create(
name=supplier_new)
if created:
SupplierArticle.objects.create(
article=article, supplier=supplier)
# Updating
return super(ArticleUpdate, self).form_valid(form)
# -----
# K-Psul
# -----
@teamkfet_required
def kpsul(request):
data = {}
data['operationgroup_form'] = KPsulOperationGroupForm()
data['trigramme_form'] = KPsulAccountForm()
data['checkout_form'] = KPsulCheckoutForm()
data['operation_formset'] = KPsulOperationFormSet(
queryset=Operation.objects.none(),
)
return render(request, 'kfet/kpsul.html', data)
@teamkfet_required
def kpsul_get_settings(request):
addcost_for = kfet_config.addcost_for
data = {
'subvention_cof': kfet_config.subvention_cof,
'addcost_for': addcost_for and addcost_for.trigramme or '',
'addcost_amount': kfet_config.addcost_amount,
}
return JsonResponse(data)
@teamkfet_required
def kpsul_update_addcost(request):
addcost_form = AddcostForm(request.POST)
if not addcost_form.is_valid():
data = {'errors': {'addcost': list(addcost_form.errors)}}
return JsonResponse(data, status=400)
required_perms = ['kfet.manage_addcosts']
if not request.user.has_perms(required_perms):
data = {
'errors': {
'missing_perms': get_missing_perms(required_perms,
request.user)
}
}
return JsonResponse(data, status=403)
trigramme = addcost_form.cleaned_data['trigramme']
account = trigramme and Account.objects.get(trigramme=trigramme) or None
amount = addcost_form.cleaned_data['amount']
kfet_config.set(addcost_for=account,
addcost_amount=amount)
data = {
'addcost': {
'for': account and account.trigramme or None,
'amount': amount,
}
}
consumers.KPsul.group_send('kfet.kpsul', data)
return JsonResponse(data)
def get_missing_perms(required_perms, user):
missing_perms_codenames = [(perm.split('.'))[1]
for perm in required_perms
if not user.has_perm(perm)]
missing_perms = list(
Permission.objects
.filter(codename__in=missing_perms_codenames)
.values_list('name', flat=True)
)
return missing_perms
@teamkfet_required
def kpsul_perform_operations(request):
# Initializing response data
data = {'operationgroup': 0, 'operations': [],
'warnings': {}, 'errors': {}}
# Checking operationgroup
operationgroup_form = KPsulOperationGroupForm(request.POST)
if not operationgroup_form.is_valid():
data['errors']['operation_group'] = list(operationgroup_form.errors)
# Checking operation_formset
operation_formset = KPsulOperationFormSet(request.POST)
if not operation_formset.is_valid():
data['errors']['operations'] = list(operation_formset.errors)
# Returning BAD REQUEST if errors
if data['errors']:
return JsonResponse(data, status=400)
# Pre-saving (no commit)
operationgroup = operationgroup_form.save(commit=False)
operations = operation_formset.save(commit=False)
# Retrieving COF grant
cof_grant = kfet_config.subvention_cof
# Retrieving addcosts data
addcost_amount = kfet_config.addcost_amount
addcost_for = kfet_config.addcost_for
# Initializing vars
required_perms = set() # Required perms to perform all operations
cof_grant_divisor = 1 + cof_grant / 100
to_addcost_for_balance = 0 # For balance of addcost_for
to_checkout_balance = 0 # For balance of selected checkout
to_articles_stocks = defaultdict(lambda: 0) # For stocks articles
is_addcost = all((addcost_for, addcost_amount,
addcost_for != operationgroup.on_acc))
need_comment = operationgroup.on_acc.need_comment
# Filling data of each operations
# + operationgroup + calculating other stuffs
for operation in operations:
if operation.type == Operation.PURCHASE:
operation.amount = - operation.article.price * operation.article_nb
if is_addcost & operation.article.category.has_addcost:
operation.addcost_for = addcost_for
operation.addcost_amount = addcost_amount \
* operation.article_nb
operation.amount -= operation.addcost_amount
to_addcost_for_balance += operation.addcost_amount
if operationgroup.on_acc.is_cash:
to_checkout_balance += -operation.amount
if operationgroup.on_acc.is_cof:
if is_addcost and operation.article.category.has_addcost:
operation.addcost_amount /= cof_grant_divisor
operation.amount = operation.amount / cof_grant_divisor
to_articles_stocks[operation.article] -= operation.article_nb
else:
if operationgroup.on_acc.is_cash:
data['errors']['account'] = 'LIQ'
if operation.type != Operation.EDIT:
to_checkout_balance += operation.amount
operationgroup.amount += operation.amount
if operation.type == Operation.DEPOSIT:
required_perms.add('kfet.perform_deposit')
if operation.type == Operation.EDIT:
required_perms.add('kfet.edit_balance_account')
need_comment = True
if operationgroup.on_acc.is_cof:
to_addcost_for_balance = to_addcost_for_balance / cof_grant_divisor
(perms, stop) = (operationgroup.on_acc
.perms_to_perform_operation(
amount=operationgroup.amount)
)
required_perms |= perms
if need_comment:
operationgroup.comment = operationgroup.comment.strip()
if not operationgroup.comment:
data['errors']['need_comment'] = True
if data['errors']:
return JsonResponse(data, status=400)
if stop or not request.user.has_perms(required_perms):
missing_perms = get_missing_perms(required_perms, request.user)
if missing_perms:
data['errors']['missing_perms'] = missing_perms
if stop:
data['errors']['negative'] = [operationgroup.on_acc.trigramme]
return JsonResponse(data, status=403)
# If 1 perm is required, filling who perform the operations
if required_perms:
operationgroup.valid_by = request.user.profile.account_kfet
# Filling cof status for statistics
operationgroup.is_cof = operationgroup.on_acc.is_cof
# Starting transaction to ensure data consistency
with transaction.atomic():
# If not cash account,
# saving account's balance and adding to Negative if not in
on_acc = operationgroup.on_acc
if not on_acc.is_cash:
(
Account.objects
.filter(pk=on_acc.pk)
.update(balance=F('balance') + operationgroup.amount)
)
on_acc.refresh_from_db()
on_acc.update_negative()
# Updating checkout's balance
if to_checkout_balance:
Checkout.objects.filter(pk=operationgroup.checkout.pk).update(
balance=F('balance') + to_checkout_balance)
# Saving addcost_for with new balance if there is one
if is_addcost and to_addcost_for_balance:
Account.objects.filter(pk=addcost_for.pk).update(
balance=F('balance') + to_addcost_for_balance)
# Saving operation group
operationgroup.save()
data['operationgroup'] = operationgroup.pk
# Filling operationgroup id for each operations and saving
for operation in operations:
operation.group = operationgroup
operation.save()
data['operations'].append(operation.pk)
# Updating articles stock
for article in to_articles_stocks:
Article.objects.filter(pk=article.pk).update(
stock=F('stock') + to_articles_stocks[article])
# Websocket data
websocket_data = {}
websocket_data['opegroups'] = [{
'add': True,
'modelname': 'opegroup',
'content': {
'id': operationgroup.pk,
'amount': operationgroup.amount,
'at': operationgroup.at,
'is_cof': operationgroup.is_cof,
'comment': operationgroup.comment,
'valid_by': (operationgroup.valid_by and
operationgroup.valid_by.trigramme or None),
'trigramme': operationgroup.on_acc.trigramme,
# Used to filter websocket updates
'account_id': operationgroup.on_acc.pk,
'checkout_id': operationgroup.checkout.pk,
'children': [],
},
}]
for ope in operations:
ope_data = {
'content': {
'id': ope.id,
'amount': ope.amount,
'canceled_at': None,
'canceled_by': None,
},
}
if ope.type == Operation.PURCHASE:
ope_data['modelname'] = 'purchase'
ope_data['content'].update({
'article_name': ope.article.name,
'article_nb': ope.article_nb,
'addcost_amount': ope.addcost_amount,
'addcost_for':
ope.addcost_for and ope.addcost_for.trigramme or None,
})
else:
ope_data['modelname'] = 'specialope'
ope_data['content'].update({
'type': ope.type,
})
websocket_data['opegroups'][0]['content']['children'].append(ope_data)
# Need refresh from db cause we used update on queryset
operationgroup.checkout.refresh_from_db()
websocket_data['checkouts'] = [{
'id': operationgroup.checkout.pk,
'balance': operationgroup.checkout.balance,
}]
websocket_data['articles'] = []
# Need refresh from db cause we used update on querysets
articles_pk = [article.pk for article in to_articles_stocks]
articles = Article.objects.values('id', 'stock').filter(pk__in=articles_pk)
for article in articles:
websocket_data['articles'].append({
'id': article['id'],
'stock': article['stock']
})
consumers.KPsul.group_send('kfet.kpsul', websocket_data)
return JsonResponse(data)
@teamkfet_required
def kpsul_cancel_operations(request):
# Pour la réponse
data = {'canceled': {}, 'warnings': {}, 'errors': {}}
# Checking if BAD REQUEST (opes_pk not int or not existing)
try:
# Set pour virer les doublons
opes_post = (
set(map(int, filter(None, request.POST.getlist('opes[]', []))))
)
transfers_post = (
set(map(int, filter(None, request.POST.getlist('transfers[]', []))))
)
except ValueError:
return JsonResponse(data, status=400)
opes_all = (
Operation.objects
.select_related('group', 'group__on_acc', 'group__on_acc__negative')
.filter(pk__in=opes_post))
opes_pk = [ope.pk for ope in opes_all]
opes_notexisting = [ope for ope in opes_post if ope not in opes_pk]
transfers_all = (
Transfer.objects
.select_related('group', 'from_acc', 'from_acc__negative',
'to_acc', 'to_acc__negative')
.filter(pk__in=transfers_post))
transfers_pk = [transfer.pk for transfer in transfers_all]
transfers_notexisting = [transfer for transfer in transfers_post
if transfer not in transfers_pk]
if transfers_notexisting or opes_notexisting:
if transfers_notexisting:
data['errors']['transfers_notexisting'] = transfers_notexisting
if opes_notexisting:
data['errors']['opes_notexisting'] = opes_notexisting
return JsonResponse(data, status=400)
already_canceled = {} # Opération/Transfert déjà annulé
opes = [] # Pas déjà annulée
transfers = []
required_perms = set()
stop_all = False
cancel_duration = kfet_config.cancel_duration
# Modifs à faire sur les balances des comptes
to_accounts_balances = defaultdict(lambda: 0)
# ------ sur les montants des groupes d'opé
to_groups_amounts = defaultdict(lambda: 0)
# ------ sur les balances de caisses
to_checkouts_balances = defaultdict(lambda: 0)
# ------ sur les stocks d'articles
to_articles_stocks = defaultdict(lambda: 0)
for ope in opes_all:
if ope.canceled_at:
# Opération déjà annulée, va pour un warning en Response
already_canceled['opes'].append(ope.pk)
else:
opes.append(ope.pk)
# Si opé il y a plus de CANCEL_DURATION, permission requise
if ope.group.at + cancel_duration < timezone.now():
required_perms.add('kfet.cancel_old_operations')
# Calcul de toutes modifs à faire en cas de validation
# Pour les balances de comptes
if not ope.group.on_acc.is_cash:
to_accounts_balances[ope.group.on_acc] -= ope.amount
if ope.addcost_for and ope.addcost_amount:
to_accounts_balances[ope.addcost_for] -= ope.addcost_amount
# Pour les groupes d'opés
to_groups_amounts[ope.group] -= ope.amount
# Pour les balances de caisses
# Les balances de caisses dont il y a eu un relevé depuis la date
# de la commande ne doivent pas être modifiées
# TODO : Prendre en compte le dernier relevé où la caisse a été
# comptée et donc modifier les balance_old (et amount_error)
# des relevés suivants.
# Note : Dans le cas où un CheckoutStatement est mis à jour
# par `.save()`, amount_error est recalculé automatiquement,
# ce qui n'est pas le cas en faisant un update sur queryset
# TODO ? : Maj les balance_old de relevés pour modifier l'erreur
last_statement = \
(CheckoutStatement.objects
.filter(checkout=ope.group.checkout)
.order_by('at')
.last())
if not last_statement or last_statement.at < ope.group.at:
if ope.is_checkout:
if ope.group.on_acc.is_cash:
to_checkouts_balances[ope.group.checkout] -= - ope.amount
else:
to_checkouts_balances[ope.group.checkout] -= ope.amount
# Pour les stocks d'articles
# Les stocks d'articles dont il y a eu un inventaire depuis la date
# de la commande ne doivent pas être modifiés
# TODO : Prendre en compte le dernier inventaire où le stock a bien
# été compté (pas dans le cas d'une livraison).
# Note : si InventoryArticle est maj par .save(), stock_error
# est recalculé automatiquement
if ope.article and ope.article_nb:
last_stock = (
InventoryArticle.objects
.select_related('inventory')
.filter(article=ope.article)
.order_by('inventory__at')
.last()
)
if not last_stock or last_stock.inventory.at < ope.group.at:
to_articles_stocks[ope.article] += ope.article_nb
for transfer in transfers_all:
if transfer.canceled_at:
# Transfert déjà annulé, va pour un warning en Response
already_canceled['transfers'].append(transfer.pk)
else:
transfers.append(transfer.pk)
# Si transfer il y a plus de CANCEL_DURATION, permission requise
if transfer.group.at + cancel_duration < timezone.now():
required_perms.add('kfet.cancel_old_operations')
# Calcul de toutes modifs à faire en cas de validation
# Pour les balances de comptes
to_accounts_balances[transfer.from_acc] += transfer.amount
to_accounts_balances[transfer.to_acc] += -transfer.amount
if not opes and not transfers:
data['warnings']['already_canceled'] = already_canceled
return JsonResponse(data)
negative_accounts = []
# Checking permissions or stop
for account in to_accounts_balances:
(perms, stop) = account.perms_to_perform_operation(
amount=to_accounts_balances[account])
required_perms |= perms
stop_all = stop_all or stop
if stop:
negative_accounts.append(account.trigramme)
if stop_all or not request.user.has_perms(required_perms):
missing_perms = get_missing_perms(required_perms, request.user)
if missing_perms:
data['errors']['missing_perms'] = missing_perms
if stop_all:
data['errors']['negative'] = negative_accounts
return JsonResponse(data, status=403)
canceled_by = required_perms and request.user.profile.account_kfet or None
canceled_at = timezone.now()
with transaction.atomic():
(Operation.objects.filter(pk__in=opes)
.update(canceled_by=canceled_by, canceled_at=canceled_at))
(Transfer.objects.filter(pk__in=transfers)
.update(canceled_by=canceled_by, canceled_at=canceled_at))
for account in to_accounts_balances:
(
Account.objects
.filter(pk=account.pk)
.update(balance=F('balance') + to_accounts_balances[account])
)
if not account.is_cash:
# Should always be true, but we want to be sure
account.refresh_from_db()
account.update_negative()
for checkout in to_checkouts_balances:
Checkout.objects.filter(pk=checkout.pk).update(
balance=F('balance') + to_checkouts_balances[checkout])
for group in to_groups_amounts:
OperationGroup.objects.filter(pk=group.pk).update(
amount=F('amount') + to_groups_amounts[group])
for article in to_articles_stocks:
Article.objects.filter(pk=article.pk).update(
stock=F('stock') + to_articles_stocks[article])
# Websocket data
websocket_data = {'opegroups': [], 'opes': [],
'checkouts': [], 'articles': []}
# Need refresh from db cause we used update on querysets
opegroups_pk = [opegroup.pk for opegroup in to_groups_amounts]
opegroups = (OperationGroup.objects
.values('id', 'amount', 'is_cof')
.filter(pk__in=opegroups_pk))
for opegroup in opegroups:
websocket_data['opegroups'].append({
'cancellation': True,
'id': opegroup['id'],
'amount': opegroup['amount'],
'is_cof': opegroup['is_cof'],
})
canceled_by = canceled_by and canceled_by.trigramme or None
for ope in opes:
websocket_data['opes'].append({
'cancellation': True,
'modelname': 'ope',
'id': ope,
'canceled_by': canceled_by,
'canceled_at': canceled_at,
})
for ope in transfers:
websocket_data['opes'].append({
'cancellation': True,
'modelname': 'transfer',
'id': ope,
'canceled_by': canceled_by,
'canceled_at': canceled_at,
})
# Need refresh from db cause we used update on querysets
checkouts_pk = [checkout.pk for checkout in to_checkouts_balances]
checkouts = (Checkout.objects
.values('id', 'balance')
.filter(pk__in=checkouts_pk))
for checkout in checkouts:
websocket_data['checkouts'].append({
'id': checkout['id'],
'balance': checkout['balance']})
# Need refresh from db cause we used update on querysets
articles_pk = [article.pk for articles in to_articles_stocks]
articles = Article.objects.values('id', 'stock').filter(pk__in=articles_pk)
for article in articles:
websocket_data['articles'].append({
'id': article['id'],
'stock': article['stock']})
consumers.KPsul.group_send('kfet.kpsul', websocket_data)
data['canceled']['opes'] = opes
data['canceled']['transfers'] = transfers
if already_canceled:
data['warnings']['already_canceled'] = already_canceled
return JsonResponse(data)
@login_required
def history_json(request):
# Récupération des paramètres
from_date = request.GET.get('from', None)
to_date = request.GET.get('to', None)
checkouts = request.GET.getlist('checkouts[]', None)
accounts = request.GET.getlist('accounts[]', None)
transfers_only = request.GET.get('transfersonly', None)
opes_only = request.GET.get('opesonly', None)
# Un non-membre de l'équipe n'a que accès à son historique
if not request.user.has_perm('kfet.is_team'):
accounts = [request.user.profile.account]
# Construction de la requête (sur les opérations) pour le prefetch
ope_queryset_prefetch = Operation.objects.select_related(
'canceled_by', 'addcost_for',
'article')
ope_prefetch = Prefetch('opes',
queryset=ope_queryset_prefetch)
transfer_queryset_prefetch = Transfer.objects.select_related(
'from_acc', 'to_acc', 'canceled_by')
if accounts:
transfer_queryset_prefetch = transfer_queryset_prefetch.filter(
Q(from_acc__id__in=accounts) |
Q(to_acc__id__in=accounts))
if not request.user.has_perm('kfet.is_team'):
acc = request.user.profile.account_kfet
transfer_queryset_prefetch = transfer_queryset_prefetch.filter(
Q(from_acc=acc) | Q(to_acc=acc))
transfer_prefetch = Prefetch('transfers',
queryset=transfer_queryset_prefetch,
to_attr='filtered_transfers')
# Construction de la requête principale
opegroups = (
OperationGroup.objects
.prefetch_related(ope_prefetch)
.select_related('on_acc__trigramme',
'valid_by__trigramme')
.order_by('at')
)
transfergroups = (
TransferGroup.objects
.prefetch_related(transfer_prefetch)
.select_related('valid_by__trigramme')
.order_by('at')
)
# Application des filtres
if from_date:
opegroups = opegroups.filter(at__gte=from_date)
transfergroups = transfergroups.filter(at__gte=from_date)
if to_date:
opegroups = opegroups.filter(at__lt=to_date)
transfergroups = transfergroups.filter(at__lt=to_date)
if checkouts:
opegroups = opegroups.filter(checkout_id__in=checkouts)
transfergroups = TransferGroup.objects.none()
if transfers_only:
opegroups = OperationGroup.objects.none()
if opes_only:
transfergroups = TransferGroup.objects.none()
if accounts:
opegroups = opegroups.filter(on_acc_id__in=accounts)
# Construction de la réponse
related_data = defaultdict(list)
objects_data = defaultdict(list)
for opegroup in opegroups:
opegroup_dict = {
'id': opegroup.id,
'amount': opegroup.amount,
'at': opegroup.at,
'is_cof': opegroup.is_cof,
'comment': opegroup.comment,
'trigramme':
opegroup.on_acc and opegroup.on_acc.trigramme or None,
}
if request.user.has_perm('kfet.is_team'):
opegroup_dict['valid_by'] = (
opegroup.valid_by and opegroup.valid_by.trigramme or None)
for ope in opegroup.opes.all():
ope_dict = {
'id': ope.id,
'amount': ope.amount,
'canceled_at': ope.canceled_at,
'is_cof': opegroup.is_cof,
'trigramme':
opegroup.on_acc and opegroup.on_acc.trigramme or None,
'opegroup__id': opegroup.id,
}
if request.user.has_perm('kfet.is_team'):
ope_dict['canceled_by'] = (
ope.canceled_by and ope.canceled_by.trigramme or None)
if ope.type == Operation.PURCHASE:
ope_dict.update({
'article_name': ope.article.name,
'article_nb': ope.article_nb,
'addcost_amount': ope.addcost_amount,
'addcost_for':
ope.addcost_for and ope.addcost_for.trigramme or None,
})
objects_data['purchase'].append(ope_dict)
else:
ope_dict.update({
'type': ope.type,
})
objects_data['specialope'].append(ope_dict)
related_data['opegroup'].append(opegroup_dict)
for transfergroup in transfergroups:
if transfergroup.filtered_transfers:
transfergroup_dict = {
'id': transfergroup.id,
'at': transfergroup.at,
'comment': transfergroup.comment,
}
if request.user.has_perm('kfet.is_team'):
transfergroup_dict['valid_by'] = (
transfergroup.valid_by and
transfergroup.valid_by.trigramme or
None)
for transfer in transfergroup.filtered_transfers:
transfer_dict = {
'id': transfer.id,
'amount': transfer.amount,
'canceled_at': transfer.canceled_at,
'from_acc': transfer.from_acc.trigramme,
'to_acc': transfer.to_acc.trigramme,
'transfergroup__id': transfergroup.id,
}
if request.user.has_perm('kfet.is_team'):
transfer_dict['canceled_by'] = (
transfer.canceled_by and
transfer.canceled_by.trigramme or
None)
objects_data['transfer'].append(transfer_dict)
related_data['transfergroup'].append(transfergroup_dict)
data = {
'objects': objects_data,
'related': related_data,
}
return JsonResponse(data)
@teamkfet_required
def kpsul_articles_data(request):
articles = (
Article.objects
.filter(is_sold=True)
.select_related('category'))
articlelist = []
categorylist = []
# TODO: nice queryset, no duplicate categories
for article in articles:
articlelist.append({
'id': article.id,
'name': article.name,
'price': article.price,
'stock': article.stock,
'category__id': article.category.id,
})
categorylist.append({
'id': article.category.id,
'name': article.category.name,
'has_addcost': article.category.has_addcost,
})
data = {
'objects': {
'article': articlelist,
},
'related': {
'category': categorylist
}
}
return JsonResponse(data)
@teamkfet_required
def history(request):
data = {
'filter_form': FilterHistoryForm(),
}
return render(request, 'kfet/history.html', data)
# -----
# Settings views
# -----
class SettingsList(TemplateView):
template_name = 'kfet/settings.html'
class SettingsUpdate(SuccessMessageMixin, FormView):
form_class = KFetConfigForm
template_name = 'kfet/settings_update.html'
success_message = 'Paramètres mis à jour'
success_url = reverse_lazy('kfet.settings')
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm('kfet.change_settings'):
form.add_error(None, 'Permission refusée')
return self.form_invalid(form)
form.save()
return super().form_valid(form)
# -----
# Transfer views
# -----
@teamkfet_required
def transfers(request):
return render(request, 'kfet/transfers.html')
@teamkfet_required
def transfers_create(request):
transfer_formset = TransferFormSet(queryset=Transfer.objects.none())
return render(request, 'kfet/transfers_create.html',
{ 'transfer_formset': transfer_formset })
@teamkfet_required
def perform_transfers(request):
data = {'errors': {}, 'transfers': [], 'transfergroup': 0}
# Checking transfer_formset
transfer_formset = TransferFormSet(request.POST)
if not transfer_formset.is_valid():
return JsonResponse({'errors': list(transfer_formset.errors)},
status=400)
transfers = transfer_formset.save(commit=False)
# Initializing vars
# Required perms to perform all transfers
required_perms = set(['kfet.add_transfer'])
# For balances of accounts
to_accounts_balances = defaultdict(lambda: 0)
for transfer in transfers:
to_accounts_balances[transfer.from_acc] -= transfer.amount
to_accounts_balances[transfer.to_acc] += transfer.amount
stop_all = False
negative_accounts = []
# Checking if ok on all accounts
for account in to_accounts_balances:
(perms, stop) = account.perms_to_perform_operation(
amount=to_accounts_balances[account])
required_perms |= perms
stop_all = stop_all or stop
if stop:
negative_accounts.append(account.trigramme)
if stop_all or not request.user.has_perms(required_perms):
missing_perms = get_missing_perms(required_perms, request.user)
if missing_perms:
data['errors']['missing_perms'] = missing_perms
if stop_all:
data['errors']['negative'] = negative_accounts
return JsonResponse(data, status=403)
# Creating transfer group
transfergroup = TransferGroup()
if required_perms:
transfergroup.valid_by = request.user.profile.account_kfet
comment = request.POST.get('comment', '')
transfergroup.comment = comment.strip()
with transaction.atomic():
# Updating balances accounts
for account in to_accounts_balances:
Account.objects.filter(pk=account.pk).update(
balance=F('balance') + to_accounts_balances[account])
account.refresh_from_db()
if account.balance < 0:
if hasattr(account, 'negative'):
if not account.negative.start:
account.negative.start = timezone.now()
account.negative.save()
else:
negative = AccountNegative(
account=account, start=timezone.now())
negative.save()
elif (hasattr(account, 'negative') and
not account.negative.balance_offset):
account.negative.delete()
# Saving transfer group
transfergroup.save()
data['transfergroup'] = transfergroup.pk
# Saving all transfers with group
for transfer in transfers:
transfer.group = transfergroup
transfer.save()
data['transfers'].append(transfer.pk)
# Websocket data
websocket_data = {}
websocket_data['opegroups'] = [{
'add': True,
'modelname': 'transfergroup',
'content': {
'id': transfergroup.pk,
'at': transfergroup.at,
'comment': transfergroup.comment,
'valid_by__trigramme': (transfergroup.valid_by and
transfergroup.valid_by.trigramme or None),
'children': []
},
}]
for transfer in transfers:
ope_data = {
'modelname': 'transfer',
'content': {
'id': transfer.pk,
'amount': transfer.amount,
'from_acc': transfer.from_acc.trigramme,
'to_acc': transfer.to_acc.trigramme,
'canceled_by__trigramme': None, 'canceled_at': None,
},
}
websocket_data['opegroups'][0]['content']['children'].append(ope_data)
consumers.KPsul.group_send('kfet.kpsul', websocket_data)
return JsonResponse(data)
class InventoryList(ListView):
queryset = (Inventory.objects
.select_related('by', 'order')
.annotate(nb_articles=Count('articles'))
.order_by('-at'))
template_name = 'kfet/inventory.html'
context_object_name = 'inventories'
@teamkfet_required
def inventory_create(request):
articles = (Article.objects
.select_related('category')
.order_by('category__name', 'name')
)
initial = []
for article in articles:
initial.append({
'article' : article.pk,
'stock_old': article.stock,
'name' : article.name,
'category' : article.category_id,
'category__name': article.category.name,
'box_capacity': article.box_capacity or 0,
})
cls_formset = formset_factory(
form = InventoryArticleForm,
extra = 0,
)
if request.POST:
formset = cls_formset(request.POST, initial=initial)
if not request.user.has_perm('kfet.add_inventory'):
messages.error(request, 'Permission refusée')
elif formset.is_valid():
with transaction.atomic():
articles = Article.objects.select_for_update()
inventory = Inventory()
inventory.by = request.user.profile.account_kfet
saved = False
for form in formset:
if form.cleaned_data['stock_new'] is not None:
if not saved:
inventory.save()
saved = True
article = articles.get(pk=form.cleaned_data['article'].pk)
stock_old = article.stock
stock_new = form.cleaned_data['stock_new']
InventoryArticle.objects.create(
inventory = inventory,
article = article,
stock_old = stock_old,
stock_new = stock_new)
article.stock = stock_new
article.save()
if saved:
messages.success(request, 'Inventaire créé')
return redirect('kfet.inventory')
messages.warning(request, 'Bah alors ? On a rien compté ?')
else:
messages.error(request, 'Pas marché')
else:
formset = cls_formset(initial = initial)
return render(request, 'kfet/inventory_create.html', {
'formset': formset,
})
class InventoryRead(DetailView):
model = Inventory
template_name = 'kfet/inventory_read.html'
context_object_name = 'inventory'
def get_context_data(self, **kwargs):
context = super(InventoryRead, self).get_context_data(**kwargs)
inventoryarticles = (InventoryArticle.objects
.select_related('article', 'article__category')
.filter(inventory = self.object)
.order_by('article__category__name', 'article__name'))
context['inventoryarts'] = inventoryarticles
return context
# -----
# Order views
# -----
class OrderList(ListView):
queryset = Order.objects.select_related('supplier', 'inventory')
template_name = 'kfet/order.html'
context_object_name = 'orders'
def get_context_data(self, **kwargs):
context = super(OrderList, self).get_context_data(**kwargs)
context['suppliers'] = Supplier.objects.order_by('name')
return context
@teamkfet_required
def order_create(request, pk):
supplier = get_object_or_404(Supplier, pk=pk)
articles = (
Article.objects
.filter(suppliers=supplier.pk)
.distinct()
.select_related('category')
.order_by('category__name', 'name')
)
# Force hit to cache
articles = list(articles)
sales_q = (
Operation.objects
.select_related('group')
.filter(article__in=articles, canceled_at=None)
.values('article')
.annotate(nb=Sum('article_nb'))
)
scale = WeekScale(last=True, n_steps=5, std_chunk=False)
chunks = scale.chunkify_qs(sales_q, field='group__at')
sales = [
{d['article']: d['nb'] for d in chunk}
for chunk in chunks
]
initial = []
for article in articles:
# Get sales for each 5 last weeks
v_all = [chunk.get(article.pk, 0) for chunk in sales]
# Take the 3 greatest (eg to avoid 2 weeks of vacations)
v_3max = heapq.nlargest(3, v_all)
# Get average and standard deviation
v_moy = statistics.mean(v_3max)
v_et = statistics.pstdev(v_3max, v_moy)
# Expected sales for next week
v_prev = v_moy + v_et
# We want to have 1.5 * the expected sales in stock
# (because sometimes some articles are not delivered)
c_rec_tot = max(v_prev * 1.5 - article.stock, 0)
# If ordered quantity is close enough to a level which can led to free
# boxes, we increase it to this level.
if article.box_capacity:
c_rec_temp = c_rec_tot / article.box_capacity
if c_rec_temp >= 10:
c_rec = round(c_rec_temp)
elif c_rec_temp > 5:
c_rec = 10
elif c_rec_temp > 2:
c_rec = 5
else:
c_rec = round(c_rec_temp)
initial.append({
'article': article.pk,
'name': article.name,
'category': article.category_id,
'category__name': article.category.name,
'stock': article.stock,
'box_capacity': article.box_capacity,
'v_all': v_all,
'v_moy': round(v_moy),
'v_et': round(v_et),
'v_prev': round(v_prev),
'c_rec': article.box_capacity and c_rec or round(c_rec_tot),
})
cls_formset = formset_factory(
form=OrderArticleForm,
extra=0,
)
if request.POST:
formset = cls_formset(request.POST, initial=initial)
if not request.user.has_perm('kfet.add_order'):
messages.error(request, 'Permission refusée')
elif formset.is_valid():
order = Order()
order.supplier = supplier
saved = False
for form in formset:
if form.cleaned_data['quantity_ordered'] is not None:
if not saved:
order.save()
saved = True
article = form.cleaned_data['article']
q_ordered = form.cleaned_data['quantity_ordered']
if article.box_capacity:
q_ordered *= article.box_capacity
OrderArticle.objects.create(
order=order,
article=article,
quantity_ordered=q_ordered,
)
if saved:
messages.success(request, 'Commande créée')
return redirect('kfet.order.read', order.pk)
messages.warning(request, 'Rien commandé => Pas de commande')
else:
messages.error(request, 'Corrigez les erreurs')
else:
formset = cls_formset(initial=initial)
return render(request, 'kfet/order_create.html', {
'supplier': supplier,
'formset': formset,
})
class OrderRead(DetailView):
model = Order
template_name = 'kfet/order_read.html'
context_object_name = 'order'
def get_context_data(self, **kwargs):
context = super(OrderRead, self).get_context_data(**kwargs)
orderarticles = (OrderArticle.objects
.select_related('article', 'article__category')
.filter(order=self.object)
.order_by('article__category__name', 'article__name')
)
context['orderarts'] = orderarticles
mail = ("Bonjour,\n\nNous voudrions pour le ##DATE## à la K-Fêt de "
"l'ENS Ulm :")
category = 0
for orderarticle in orderarticles:
if category != orderarticle.article.category:
category = orderarticle.article.category
mail += '\n'
nb = orderarticle.quantity_ordered
box = ''
if orderarticle.article.box_capacity:
nb /= orderarticle.article.box_capacity
if nb >= 2:
box = ' %ss de' % orderarticle.article.box_type
else:
box = ' %s de' % orderarticle.article.box_type
name = orderarticle.article.name.capitalize()
mail += "\n- %s%s %s" % (round(nb), box, name)
mail += ("\n\nMerci d'appeler le numéro suivant lorsque les livreurs "
"sont là : ##TELEPHONE##\nCordialement,\n##PRENOM## ##NOM## "
", pour la K-Fêt de l'ENS Ulm")
context['mail'] = mail
return context
@teamkfet_required
def order_to_inventory(request, pk):
order = get_object_or_404(Order, pk=pk)
if hasattr(order, 'inventory'):
raise Http404
supplier_prefetch = Prefetch(
'article__supplierarticle_set',
queryset=(
SupplierArticle.objects
.filter(supplier=order.supplier)
.order_by('-at')
),
to_attr='supplier',
)
order_articles = (
OrderArticle.objects
.filter(order=order.pk)
.select_related('article', 'article__category')
.prefetch_related(
supplier_prefetch,
)
.order_by('article__category__name', 'article__name')
)
initial = []
for order_article in order_articles:
article = order_article.article
initial.append({
'article': article.pk,
'name': article.name,
'category': article.category_id,
'category__name': article.category.name,
'quantity_ordered': order_article.quantity_ordered,
'quantity_received': order_article.quantity_ordered,
'price_HT': article.supplier[0].price_HT,
'TVA': article.supplier[0].TVA,
'rights': article.supplier[0].rights,
})
cls_formset = formset_factory(OrderArticleToInventoryForm, extra=0)
if request.method == 'POST':
formset = cls_formset(request.POST, initial=initial)
if not request.user.has_perm('kfet.order_to_inventory'):
messages.error(request, 'Permission refusée')
elif formset.is_valid():
with transaction.atomic():
inventory = Inventory.objects.create(
order=order, by=request.user.profile.account_kfet,
)
new_supplierarticle = []
new_inventoryarticle = []
for form in formset:
q_received = form.cleaned_data['quantity_received']
article = form.cleaned_data['article']
price_HT = form.cleaned_data['price_HT']
TVA = form.cleaned_data['TVA']
rights = form.cleaned_data['rights']
if any((form.initial['price_HT'] != price_HT,
form.initial['TVA'] != TVA,
form.initial['rights'] != rights)):
new_supplierarticle.append(
SupplierArticle(
supplier=order.supplier,
article=article,
price_HT=price_HT,
TVA=TVA,
rights=rights,
)
)
(
OrderArticle.objects
.filter(order=order, article=article)
.update(quantity_received=q_received)
)
new_inventoryarticle.append(
InventoryArticle(
inventory=inventory,
article=article,
stock_old=article.stock,
stock_new=article.stock + q_received,
)
)
article.stock += q_received
if q_received > 0:
article.is_sold = True
article.save()
SupplierArticle.objects.bulk_create(new_supplierarticle)
InventoryArticle.objects.bulk_create(new_inventoryarticle)
messages.success(request, "C'est tout bon !")
return redirect('kfet.order')
else:
messages.error(request, "Corrigez les erreurs")
else:
formset = cls_formset(initial=initial)
return render(request, 'kfet/order_to_inventory.html', {
'formset': formset,
})
class SupplierUpdate(SuccessMessageMixin, UpdateView):
model = Supplier
template_name = 'kfet/supplier_form.html'
fields = ['name', 'address', 'email', 'phone', 'comment']
success_url = reverse_lazy('kfet.order')
sucess_message = 'Données fournisseur mis à jour'
# Surcharge de la validation
def form_valid(self, form):
# Checking permission
if not self.request.user.has_perm('kfet.change_supplier'):
form.add_error(None, 'Permission refusée')
return self.form_invalid(form)
# Updating
return super(SupplierUpdate, self).form_valid(form)
# ==========
# Statistics
# ==========
# ---------------
# Vues génériques
# ---------------
class JSONDetailView(JSONResponseMixin, BaseDetailView):
"""Returns a DetailView that renders a JSON."""
def render_to_response(self, context):
return self.render_to_json_response(context)
class PkUrlMixin(object):
def get_object(self, *args, **kwargs):
get_by = self.kwargs.get(self.pk_url_kwarg)
return get_object_or_404(self.model, **{self.pk_url_kwarg: get_by})
class SingleResumeStat(JSONDetailView):
"""Manifest for a kind of a stat about an object.
Returns JSON whose payload is an array containing descriptions of a stat:
url to retrieve data, label, ...
"""
id_prefix = ''
nb_default = 0
stats = []
url_stat = None
def get_context_data(self, **kwargs):
# On n'hérite pas
object_id = self.object.id
context = {}
stats = []
prefix = '{}_{}'.format(self.id_prefix, object_id)
for i, stat_def in enumerate(self.stats):
url_pk = getattr(self.object, self.pk_url_kwarg)
url_params_d = stat_def.get('url_params', {})
if len(url_params_d) > 0:
url_params = '?{}'.format(urlencode(url_params_d))
else:
url_params = ''
stats.append({
'label': stat_def['label'],
'btn': 'btn_{}_{}'.format(prefix, i),
'url': '{url}{params}'.format(
url=reverse(self.url_stat, args=[url_pk]),
params=url_params,
),
})
context['id_prefix'] = prefix
context['content_id'] = "content_%s" % prefix
context['stats'] = stats
context['default_stat'] = self.nb_default
context['object_id'] = object_id
return context
# -----------------------
# Evolution Balance perso
# -----------------------
ID_PREFIX_ACC_BALANCE = "balance_acc"
class AccountStatBalanceList(PkUrlMixin, SingleResumeStat):
"""Manifest for balance stats of an account."""
model = Account
context_object_name = 'account'
pk_url_kwarg = 'trigramme'
url_stat = 'kfet.account.stat.balance'
id_prefix = ID_PREFIX_ACC_BALANCE
stats = [
{
'label': 'Tout le temps',
},
{
'label': '1 an',
'url_params': {'last_days': 365},
},
{
'label': '6 mois',
'url_params': {'last_days': 183},
},
{
'label': '3 mois',
'url_params': {'last_days': 90},
},
{
'label': '30 jours',
'url_params': {'last_days': 30},
},
]
nb_default = 0
def get_object(self, *args, **kwargs):
obj = super().get_object(*args, **kwargs)
if self.request.user != obj.user:
raise PermissionDenied
return obj
@method_decorator(login_required)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
class AccountStatBalance(PkUrlMixin, JSONDetailView):
"""Datasets of balance of an account.
Operations and Transfers are taken into account.
"""
model = Account
pk_url_kwarg = 'trigramme'
context_object_name = 'account'
def get_changes_list(self, last_days=None, begin_date=None, end_date=None):
account = self.object
# prepare filters
if last_days is not None:
end_date = timezone.now()
begin_date = end_date - timezone.timedelta(days=last_days)
# prepare querysets
# TODO: retirer les opgroup dont tous les op sont annulées
opegroups = OperationGroup.objects.filter(on_acc=account)
transfers = (
Transfer.objects
.filter(canceled_at=None)
.select_related('group')
)
recv_transfers = transfers.filter(to_acc=account)
sent_transfers = transfers.filter(from_acc=account)
# apply filters
if begin_date is not None:
opegroups = opegroups.filter(at__gte=begin_date)
recv_transfers = recv_transfers.filter(group__at__gte=begin_date)
sent_transfers = sent_transfers.filter(group__at__gte=begin_date)
if end_date is not None:
opegroups = opegroups.filter(at__lte=end_date)
recv_transfers = recv_transfers.filter(group__at__lte=end_date)
sent_transfers = sent_transfers.filter(group__at__lte=end_date)
# On transforme tout ça en une liste de dictionnaires sous la forme
# {'at': date,
# 'amount': changement de la balance (négatif si diminue la balance,
# positif si l'augmente),
# 'label': text descriptif,
# 'balance': état de la balance après l'action (0 pour le moment,
# sera mis à jour lors d'une
# autre passe)
# }
actions = []
actions.append({
'at': (begin_date or account.created_at).isoformat(),
'amount': 0,
'balance': 0,
})
actions.append({
'at': (end_date or timezone.now()).isoformat(),
'amount': 0,
'balance': 0,
})
actions += [
{
'at': ope_grp.at.isoformat(),
'amount': ope_grp.amount,
'balance': 0,
} for ope_grp in opegroups
] + [
{
'at': tr.group.at.isoformat(),
'amount': tr.amount,
'balance': 0,
} for tr in recv_transfers
] + [
{
'at': tr.group.at.isoformat(),
'amount': -tr.amount,
'balance': 0,
} for tr in sent_transfers
]
# Maintenant on trie la liste des actions par ordre du plus récent
# an plus ancien et on met à jour la balance
if len(actions) > 1:
actions = sorted(actions, key=lambda k: k['at'], reverse=True)
actions[0]['balance'] = account.balance
for i in range(len(actions)-1):
actions[i+1]['balance'] = \
actions[i]['balance'] - actions[i+1]['amount']
return actions
def get_context_data(self, *args, **kwargs):
context = {}
last_days = self.request.GET.get('last_days', None)
if last_days is not None:
last_days = int(last_days)
begin_date = self.request.GET.get('begin_date', None)
end_date = self.request.GET.get('end_date', None)
changes = self.get_changes_list(
last_days=last_days,
begin_date=begin_date, end_date=end_date,
)
context['charts'] = [{
"color": "rgb(255, 99, 132)",
"label": "Balance",
"values": changes,
}]
context['is_time_chart'] = True
if len(changes) > 0:
context['min_date'] = changes[-1]['at']
context['max_date'] = changes[0]['at']
# TODO: offset
return context
def get_object(self, *args, **kwargs):
obj = super().get_object(*args, **kwargs)
if self.request.user != obj.user:
raise PermissionDenied
return obj
@method_decorator(login_required)
def dispatch(self, *args, **kwargs):
return super(AccountStatBalance, self).dispatch(*args, **kwargs)
# ------------------------
# Consommation personnelle
# ------------------------
ID_PREFIX_ACC_LAST = "last_acc"
ID_PREFIX_ACC_LAST_DAYS = "last_days_acc"
ID_PREFIX_ACC_LAST_WEEKS = "last_weeks_acc"
ID_PREFIX_ACC_LAST_MONTHS = "last_months_acc"
class AccountStatOperationList(PkUrlMixin, SingleResumeStat):
"""Manifest for operations stats of an account."""
model = Account
context_object_name = 'account'
pk_url_kwarg = 'trigramme'
id_prefix = ID_PREFIX_ACC_LAST
nb_default = 2
stats = last_stats_manifest(types=[Operation.PURCHASE])
url_stat = 'kfet.account.stat.operation'
def get_object(self, *args, **kwargs):
obj = super().get_object(*args, **kwargs)
if self.request.user != obj.user:
raise PermissionDenied
return obj
@method_decorator(login_required)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
class AccountStatOperation(ScaleMixin, PkUrlMixin, JSONDetailView):
"""Datasets of operations of an account."""
model = Account
pk_url_kwarg = 'trigramme'
context_object_name = 'account'
id_prefix = ""
def get_operations(self, scale, types=None):
# On selectionne les opérations qui correspondent
# à l'article en question et qui ne sont pas annulées
# puis on choisi pour chaques intervalle les opérations
# effectuées dans ces intervalles de temps
all_operations = (
Operation.objects
.filter(group__on_acc=self.object,
canceled_at=None)
.values('article_nb', 'group__at')
.order_by('group__at')
)
if types is not None:
all_operations = all_operations.filter(type__in=types)
chunks = scale.get_by_chunks(
all_operations, field_db='group__at',
field_callback=(lambda d: d['group__at']),
)
return chunks
def get_context_data(self, *args, **kwargs):
old_ctx = super().get_context_data(*args, **kwargs)
context = {'labels': old_ctx['labels']}
scale = self.scale
types = self.request.GET.get('types', None)
if types is not None:
types = ast.literal_eval(types)
operations = self.get_operations(types=types, scale=scale)
# On compte les opérations
nb_ventes = []
for chunk in operations:
ventes = sum(ope['article_nb'] for ope in chunk)
nb_ventes.append(ventes)
context['charts'] = [{"color": "rgb(255, 99, 132)",
"label": "NB items achetés",
"values": nb_ventes}]
return context
def get_object(self, *args, **kwargs):
obj = super().get_object(*args, **kwargs)
if self.request.user != obj.user:
raise PermissionDenied
return obj
@method_decorator(login_required)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
# ------------------------
# Article Satistiques Last
# ------------------------
ID_PREFIX_ART_LAST = "last_art"
ID_PREFIX_ART_LAST_DAYS = "last_days_art"
ID_PREFIX_ART_LAST_WEEKS = "last_weeks_art"
ID_PREFIX_ART_LAST_MONTHS = "last_months_art"
class ArticleStatSalesList(SingleResumeStat):
"""Manifest for sales stats of an article."""
model = Article
context_object_name = 'article'
id_prefix = ID_PREFIX_ART_LAST
nb_default = 2
url_stat = 'kfet.article.stat.sales'
stats = last_stats_manifest()
@method_decorator(teamkfet_required)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)
class ArticleStatSales(ScaleMixin, JSONDetailView):
"""Datasets of sales of an article."""
model = Article
context_object_name = 'article'
def get_context_data(self, *args, **kwargs):
old_ctx = super().get_context_data(*args, **kwargs)
context = {'labels': old_ctx['labels']}
scale = self.scale
all_purchases = (
Operation.objects
.filter(
type=Operation.PURCHASE,
article=self.object,
canceled_at=None,
)
.values('group__at', 'article_nb')
.order_by('group__at')
)
liq_only = all_purchases.filter(group__on_acc__trigramme='LIQ')
liq_exclude = all_purchases.exclude(group__on_acc__trigramme='LIQ')
chunks_liq = scale.get_by_chunks(
liq_only, field_db='group__at',
field_callback=lambda d: d['group__at'],
)
chunks_no_liq = scale.get_by_chunks(
liq_exclude, field_db='group__at',
field_callback=lambda d: d['group__at'],
)
# On compte les opérations
nb_ventes = []
nb_accounts = []
nb_liq = []
for chunk_liq, chunk_no_liq in zip(chunks_liq, chunks_no_liq):
sum_accounts = sum(ope['article_nb'] for ope in chunk_no_liq)
sum_liq = sum(ope['article_nb'] for ope in chunk_liq)
nb_ventes.append(sum_accounts + sum_liq)
nb_accounts.append(sum_accounts)
nb_liq.append(sum_liq)
context['charts'] = [{"color": "rgb(255, 99, 132)",
"label": "Toutes consommations",
"values": nb_ventes},
{"color": "rgb(54, 162, 235)",
"label": "LIQ",
"values": nb_liq},
{"color": "rgb(255, 205, 86)",
"label": "Comptes K-Fêt",
"values": nb_accounts}]
return context
@method_decorator(teamkfet_required)
def dispatch(self, *args, **kwargs):
return super().dispatch(*args, **kwargs)