Merge branch 'master' into Aufinal/prettify-revente

Merge remote-tracking branch 'origin/master' into Aufinal/prettify-revente
This commit is contained in:
Ludovic Stephan 2019-01-07 22:34:28 +01:00
parent 47c02d72af
commit 445745ee15
43 changed files with 1788 additions and 453 deletions

View file

@ -27,10 +27,10 @@ test:
- sed -i.bak -E 's;^REDIS_PASSWD = .*$;REDIS_PASSWD = "";' cof/settings/secret.py - sed -i.bak -E 's;^REDIS_PASSWD = .*$;REDIS_PASSWD = "";' cof/settings/secret.py
# Remove the old test database if it has not been done yet # Remove the old test database if it has not been done yet
- psql --username=$POSTGRES_USER --host=$DBHOST -c "DROP DATABASE IF EXISTS test_$POSTGRES_DB" - psql --username=$POSTGRES_USER --host=$DBHOST -c "DROP DATABASE IF EXISTS test_$POSTGRES_DB"
- pip install --upgrade -r requirements.txt coverage - pip install --upgrade -r requirements.txt coverage tblib
- python --version - python --version
script: script:
- coverage run manage.py test - coverage run manage.py test --parallel
after_script: after_script:
- coverage report - coverage report
services: services:
@ -52,9 +52,9 @@ linters:
- pip install --upgrade black isort flake8 - pip install --upgrade black isort flake8
script: script:
- black --check . - black --check .
- isort --recursive --check-only --diff bda cof gestioncof kfet provisioning shared utils - isort --recursive --check-only --diff bda cof gestioncof kfet petitscours provisioning shared utils
# Print errors only # Print errors only
- flake8 --exit-zero bda cof gestioncof kfet provisioning shared utils - flake8 --exit-zero bda cof gestioncof kfet petitscours provisioning shared utils
cache: cache:
key: linters key: linters
paths: paths:

View file

@ -1,225 +1,357 @@
import json import json
import os
from datetime import timedelta from datetime import timedelta
from unittest import mock
from urllib.parse import urlencode
from django.conf import settings from django.test import TestCase
from django.contrib.auth.models import User from django.utils import formats, timezone
from django.core.management import call_command
from django.test import Client, TestCase
from django.utils import timezone
from ..models import CategorieSpectacle, Salle, Spectacle, Tirage from ..models import CategorieSpectacle, Participant, Salle
from .testcases import BdATestHelpers, BdAViewTestCaseMixin
from .utils import create_spectacle
def create_user(username, is_cof=False, is_buro=False): class InscriptionViewTestCase(BdATestHelpers, BdAViewTestCaseMixin, TestCase):
user = User.objects.create_user(username=username, password=username) url_name = "bda-tirage-inscription"
user.profile.is_cof = is_cof
user.profile.is_buro = is_buro
user.profile.save()
return user
http_methods = ["GET", "POST"]
def user_is_cof(user): auth_user = "bda_member"
return (user is not None) and user.profile.is_cof auth_forbidden = [None, "bda_other"]
bda_testdata = True
def user_is_staff(user): @property
return (user is not None) and user.profile.is_buro def url_kwargs(self):
return {"tirage_id": self.tirage.id}
@property
def url_expected(self):
return "/bda/inscription/{}".format(self.tirage.id)
class BdATestHelpers: def test_get_opened(self):
def setUp(self): self.tirage.ouverture = timezone.now() - timedelta(days=1)
# Some user with different access privileges self.tirage.fermeture = timezone.now() + timedelta(days=1)
staff = create_user(username="bda_staff", is_cof=True, is_buro=True) self.tirage.save()
staff_c = Client()
staff_c.force_login(staff)
member = create_user(username="bda_member", is_cof=True) resp = self.client.get(self.url)
member_c = Client()
member_c.force_login(member)
other = create_user(username="bda_other") self.assertEqual(resp.status_code, 200)
other_c = Client() self.assertFalse(resp.context["messages"])
other_c.force_login(other)
self.client_matrix = [ def test_get_closed_future(self):
(staff, staff_c), self.tirage.ouverture = timezone.now() + timedelta(days=1)
(member, member_c), self.tirage.fermeture = timezone.now() + timedelta(days=2)
(other, other_c), self.tirage.save()
(None, Client()),
]
def require_custommails(self): resp = self.client.get(self.url)
data_file = os.path.join(
settings.BASE_DIR, "gestioncof", "management", "data", "custommail.json"
)
call_command("syncmails", data_file, verbosity=0)
def check_restricted_access( self.assertEqual(resp.status_code, 200)
self, url, validate_user=user_is_cof, redirect_url=None self.assertIn(
): "Le tirage n'est pas encore ouvert : ouverture le {}".format(
def craft_redirect_url(user): formats.localize(timezone.template_localtime(self.tirage.ouverture))
if redirect_url: ),
return redirect_url [str(msg) for msg in resp.context["messages"]],
elif user is None:
# client is not logged in
login_url = "/login"
if url:
login_url += "?{}".format(urlencode({"next": url}, safe="/"))
return login_url
else:
return "/"
for (user, client) in self.client_matrix:
resp = client.get(url, follow=True)
if validate_user(user):
self.assertEqual(200, resp.status_code)
else:
self.assertRedirects(resp, craft_redirect_url(user))
class TestBdAViews(BdATestHelpers, TestCase):
def setUp(self):
# Signals handlers on login/logout send messages.
# Due to the way the Django' test Client performs login, this raise an
# error. As workaround, we mock the Django' messages module.
patcher_messages = mock.patch("gestioncof.signals.messages")
patcher_messages.start()
self.addCleanup(patcher_messages.stop)
# Set up the helpers
super().setUp()
# Some BdA stuff
self.tirage = Tirage.objects.create(
title="Test tirage",
appear_catalogue=True,
ouverture=timezone.now(),
fermeture=timezone.now(),
)
self.category = CategorieSpectacle.objects.create(name="Category")
self.location = Salle.objects.create(name="here")
Spectacle.objects.bulk_create(
[
Spectacle(
title="foo",
date=timezone.now(),
location=self.location,
price=0,
slots=42,
tirage=self.tirage,
listing=False,
category=self.category,
),
Spectacle(
title="bar",
date=timezone.now(),
location=self.location,
price=1,
slots=142,
tirage=self.tirage,
listing=False,
category=self.category,
),
Spectacle(
title="baz",
date=timezone.now(),
location=self.location,
price=2,
slots=242,
tirage=self.tirage,
listing=False,
category=self.category,
),
]
) )
def test_bda_inscriptions(self): def test_get_closed_past(self):
# TODO: test the form self.tirage.ouverture = timezone.now() - timedelta(days=2)
url = "/bda/inscription/{}".format(self.tirage.id) self.tirage.fermeture = timezone.now() - timedelta(days=1)
self.check_restricted_access(url) self.tirage.save()
def test_bda_places(self): resp = self.client.get(self.url)
url = "/bda/places/{}".format(self.tirage.id)
self.check_restricted_access(url)
def test_etat_places(self): self.assertEqual(resp.status_code, 200)
url = "/bda/etat-places/{}".format(self.tirage.id) self.assertIn(
self.check_restricted_access(url) " C'est fini : tirage au sort dans la journée !",
[str(msg) for msg in resp.context["messages"]],
)
def test_perform_tirage(self): def get_base_post_data(self):
# Only staff member can perform a tirage return {
url = "/bda/tirage/{}".format(self.tirage.id) "choixspectacle_set-TOTAL_FORMS": "3",
self.check_restricted_access(url, validate_user=user_is_staff) "choixspectacle_set-INITIAL_FORMS": "0",
"choixspectacle_set-MIN_NUM_FORMS": "0",
"choixspectacle_set-MAX_NUM_FORMS": "1000",
}
_, staff_c = self.client_matrix[0] base_post_data = property(get_base_post_data)
def test_post(self):
self.tirage.ouverture = timezone.now() - timedelta(days=1)
self.tirage.fermeture = timezone.now() + timedelta(days=1)
self.tirage.save()
data = dict(
self.base_post_data,
**{
"choixspectacle_set-TOTAL_FORMS": "2",
"choixspectacle_set-0-id": "",
"choixspectacle_set-0-participant": "",
"choixspectacle_set-0-spectacle": str(self.show1.pk),
"choixspectacle_set-0-double_choice": "1",
"choixspectacle_set-0-priority": "2",
"choixspectacle_set-1-id": "",
"choixspectacle_set-1-participant": "",
"choixspectacle_set-1-spectacle": str(self.show2.pk),
"choixspectacle_set-1-double_choice": "autoquit",
"choixspectacle_set-1-priority": "1",
}
)
resp = self.client.post(self.url, data)
self.assertEqual(resp.status_code, 200)
self.assertIn(
"Votre inscription a été mise à jour avec succès !",
[str(msg) for msg in resp.context["messages"]],
)
participant = Participant.objects.get(
user=self.users["bda_member"], tirage=self.tirage
)
self.assertSetEqual(
set(
participant.choixspectacle_set.values_list(
"priority", "spectacle_id", "double_choice"
)
),
{(1, self.show2.pk, "autoquit"), (2, self.show1.pk, "1")},
)
def test_post_state_changed(self):
self.tirage.ouverture = timezone.now() - timedelta(days=1)
self.tirage.fermeture = timezone.now() + timedelta(days=1)
self.tirage.save()
data = {"dbstate": "different"}
resp = self.client.post(self.url, data)
self.assertEqual(resp.status_code, 200)
self.assertIn(
"Impossible d'enregistrer vos modifications : vous avez apporté d'autres "
"modifications entre temps.",
[str(msg) for msg in resp.context["messages"]],
)
class PlacesViewTestCase(BdATestHelpers, BdAViewTestCaseMixin, TestCase):
url_name = "bda-places-attribuees"
auth_user = "bda_member"
auth_forbidden = [None, "bda_other"]
bda_testdata = True
@property
def url_kwargs(self):
return {"tirage_id": self.tirage.id}
@property
def url_expected(self):
return "/bda/places/{}".format(self.tirage.id)
class EtatPlacesViewTestCase(BdATestHelpers, BdAViewTestCaseMixin, TestCase):
url_name = "bda-etat-places"
auth_user = "bda_member"
auth_forbidden = [None, "bda_other"]
bda_testdata = True
@property
def url_kwargs(self):
return {"tirage_id": self.tirage.id}
@property
def url_expected(self):
return "/bda/etat-places/{}".format(self.tirage.id)
class TirageViewTestCase(BdATestHelpers, BdAViewTestCaseMixin, TestCase):
url_name = "bda-tirage"
http_methods = ["GET", "POST"]
auth_user = "bda_staff"
auth_forbidden = [None, "bda_other", "bda_member"]
bda_testdata = True
@property
def url_kwargs(self):
return {"tirage_id": self.tirage.id}
@property
def url_expected(self):
return "/bda/tirage/{}".format(self.tirage.id)
def test_perform_tirage_disabled(self):
# Cannot be performed if disabled # Cannot be performed if disabled
self.tirage.enable_do_tirage = False self.tirage.enable_do_tirage = False
self.tirage.save() self.tirage.save()
resp = staff_c.get(url) resp = self.client.get(self.url)
self.assertTemplateUsed(resp, "tirage-failed.html") self.assertTemplateUsed(resp, "tirage-failed.html")
def test_perform_tirage_opened_registrations(self):
# Cannot be performed if registrations are still open # Cannot be performed if registrations are still open
self.tirage.enable_do_tirage = True self.tirage.enable_do_tirage = True
self.tirage.fermeture = timezone.now() + timedelta(seconds=3600) self.tirage.fermeture = timezone.now() + timedelta(seconds=3600)
self.tirage.save() self.tirage.save()
resp = staff_c.get(url) resp = self.client.get(self.url)
self.assertTemplateUsed(resp, "tirage-failed.html") self.assertTemplateUsed(resp, "tirage-failed.html")
def test_perform_tirage(self):
# Otherwise, perform the tirage # Otherwise, perform the tirage
self.tirage.enable_do_tirage = True
self.tirage.fermeture = timezone.now() self.tirage.fermeture = timezone.now()
self.tirage.save() self.tirage.save()
resp = staff_c.get(url) resp = self.client.get(self.url)
self.assertTemplateNotUsed(resp, "tirage-failed.html") self.assertTemplateNotUsed(resp, "tirage-failed.html")
def test_spectacles_list(self):
url = "/bda/spectacles/{}".format(self.tirage.id)
self.check_restricted_access(url, validate_user=user_is_staff)
def test_spectacle_detail(self): class SpectacleListViewTestCase(BdATestHelpers, BdAViewTestCaseMixin, TestCase):
show = self.tirage.spectacle_set.first() url_name = "bda-liste-spectacles"
url = "/bda/spectacles/{}/{}".format(self.tirage.id, show.id)
self.check_restricted_access(url, validate_user=user_is_staff)
def test_tirage_unpaid(self): auth_user = "bda_staff"
url = "/bda/spectacles/unpaid/{}".format(self.tirage.id) auth_forbidden = [None, "bda_other", "bda_member"]
self.check_restricted_access(url, validate_user=user_is_staff)
def test_send_reminders(self): bda_testdata = True
@property
def url_kwargs(self):
return {"tirage_id": self.tirage.id}
@property
def url_expected(self):
return "/bda/spectacles/{}".format(self.tirage.id)
class SpectacleViewTestCase(BdATestHelpers, BdAViewTestCaseMixin, TestCase):
url_name = "bda-spectacle"
auth_user = "bda_staff"
auth_forbidden = [None, "bda_other", "bda_member"]
bda_testdata = True
@property
def url_kwargs(self):
return {"tirage_id": self.tirage.id, "spectacle_id": self.show1.id}
@property
def url_expected(self):
return "/bda/spectacles/{}/{}".format(self.tirage.id, self.show1.id)
class UnpaidViewTestCase(BdATestHelpers, BdAViewTestCaseMixin, TestCase):
url_name = "bda-unpaid"
auth_user = "bda_staff"
auth_forbidden = [None, "bda_other", "bda_member"]
bda_testdata = True
@property
def url_kwargs(self):
return {"tirage_id": self.tirage.id}
@property
def url_expected(self):
return "/bda/spectacles/unpaid/{}".format(self.tirage.id)
class SendRemindersViewTestCase(BdATestHelpers, BdAViewTestCaseMixin, TestCase):
url_name = "bda-rappels"
auth_user = "bda_staff"
auth_forbidden = [None, "bda_other", "bda_member"]
bda_testdata = True
@property
def url_kwargs(self):
return {"spectacle_id": self.show1.id}
@property
def url_expected(self):
return "/bda/mails-rappel/{}".format(self.show1.id)
def test_post(self):
self.require_custommails() self.require_custommails()
# Just get the page resp = self.client.post(self.url)
show = self.tirage.spectacle_set.first()
url = "/bda/mails-rappel/{}".format(show.id)
self.check_restricted_access(url, validate_user=user_is_staff)
# Actually send the reminder emails
_, staff_c = self.client_matrix[0]
resp = staff_c.post(url)
self.assertEqual(200, resp.status_code) self.assertEqual(200, resp.status_code)
# TODO: check that emails are sent # TODO: check that emails are sent
def test_catalogue_api(self):
class DescriptionsSpectaclesViewTestCase(
BdATestHelpers, BdAViewTestCaseMixin, TestCase
):
url_name = "bda-descriptions"
auth_user = None
auth_forbidden = []
bda_testdata = True
@property
def url_kwargs(self):
return {"tirage_id": self.tirage.pk}
@property
def url_expected(self):
return "/bda/descriptions/{}".format(self.tirage.pk)
def test_get(self):
resp = self.client.get(self.url)
self.assertEqual(resp.status_code, 200)
self.assertListEqual(
list(resp.context["shows"]), [self.show1, self.show2, self.show3]
)
def test_get_filter_category(self):
category1 = CategorieSpectacle.objects.create(name="Category 1")
category2 = CategorieSpectacle.objects.create(name="Category 2")
show1 = create_spectacle(category=category1, tirage=self.tirage)
show2 = create_spectacle(category=category2, tirage=self.tirage)
resp = self.client.get(self.url, {"category": "Category 1"})
self.assertEqual(resp.status_code, 200)
self.assertListEqual(list(resp.context["shows"]), [show1])
resp = self.client.get(self.url, {"category": "Category 2"})
self.assertEqual(resp.status_code, 200)
self.assertListEqual(list(resp.context["shows"]), [show2])
def test_get_filter_location(self):
location1 = Salle.objects.create(name="Location 1")
location2 = Salle.objects.create(name="Location 2")
show1 = create_spectacle(location=location1, tirage=self.tirage)
show2 = create_spectacle(location=location2, tirage=self.tirage)
resp = self.client.get(self.url, {"location": str(location1.pk)})
self.assertEqual(resp.status_code, 200)
self.assertListEqual(list(resp.context["shows"]), [show1])
resp = self.client.get(self.url, {"location": str(location2.pk)})
self.assertEqual(resp.status_code, 200)
self.assertListEqual(list(resp.context["shows"]), [show2])
class CatalogueViewTestCase(BdATestHelpers, BdAViewTestCaseMixin, TestCase):
auth_user = None
auth_forbidden = []
bda_testdata = True
def test_api_list(self):
url_list = "/bda/catalogue/list" url_list = "/bda/catalogue/list"
url_details = "/bda/catalogue/details?id={}".format(self.tirage.id) resp = self.client.get(url_list)
url_descriptions = "/bda/catalogue/descriptions?id={}".format(self.tirage.id)
# Anyone can get
def anyone_can_get(url):
self.check_restricted_access(url, validate_user=lambda user: True)
anyone_can_get(url_list)
anyone_can_get(url_details)
anyone_can_get(url_descriptions)
# The resulting JSON contains the information
_, client = self.client_matrix[0]
# List
resp = client.get(url_list)
self.assertJSONEqual( self.assertJSONEqual(
resp.content.decode("utf-8"), resp.content.decode("utf-8"),
[{"id": self.tirage.id, "title": self.tirage.title}], [{"id": self.tirage.id, "title": self.tirage.title}],
) )
# Details def test_api_details(self):
resp = client.get(url_details) url_details = "/bda/catalogue/details?id={}".format(self.tirage.id)
resp = self.client.get(url_details)
self.assertJSONEqual( self.assertJSONEqual(
resp.content.decode("utf-8"), resp.content.decode("utf-8"),
{ {
@ -228,8 +360,9 @@ class TestBdAViews(BdATestHelpers, TestCase):
}, },
) )
# Descriptions def test_api_descriptions(self):
resp = client.get(url_descriptions) url_descriptions = "/bda/catalogue/descriptions?id={}".format(self.tirage.id)
resp = self.client.get(url_descriptions)
raw = resp.content.decode("utf-8") raw = resp.content.decode("utf-8")
try: try:
results = json.loads(raw) results = json.loads(raw)

75
bda/tests/testcases.py Normal file
View file

@ -0,0 +1,75 @@
import os
from django.conf import settings
from django.core.management import call_command
from django.utils import timezone
from shared.tests.testcases import ViewTestCaseMixin
from ..models import CategorieSpectacle, Salle, Spectacle, Tirage
from .utils import create_user
class BdAViewTestCaseMixin(ViewTestCaseMixin):
def get_users_base(self):
return {
"bda_other": create_user(username="bda_other"),
"bda_member": create_user(username="bda_member", is_cof=True),
"bda_staff": create_user(username="bda_staff", is_cof=True, is_buro=True),
}
class BdATestHelpers:
bda_testdata = False
def setUp(self):
super().setUp()
if self.bda_testdata:
self.load_bda_testdata()
def require_custommails(self):
data_file = os.path.join(
settings.BASE_DIR, "gestioncof", "management", "data", "custommail.json"
)
call_command("syncmails", data_file, verbosity=0)
def load_bda_testdata(self):
self.tirage = Tirage.objects.create(
title="Test tirage",
appear_catalogue=True,
ouverture=timezone.now(),
fermeture=timezone.now(),
)
self.category = CategorieSpectacle.objects.create(name="Category")
self.location = Salle.objects.create(name="here")
self.show1 = Spectacle.objects.create(
title="foo",
date=timezone.now(),
location=self.location,
price=0,
slots=42,
tirage=self.tirage,
listing=False,
category=self.category,
)
self.show2 = Spectacle.objects.create(
title="bar",
date=timezone.now(),
location=self.location,
price=1,
slots=142,
tirage=self.tirage,
listing=False,
category=self.category,
)
self.show3 = Spectacle.objects.create(
title="baz",
date=timezone.now(),
location=self.location,
price=2,
slots=242,
tirage=self.tirage,
listing=False,
category=self.category,
)

36
bda/tests/utils.py Normal file
View file

@ -0,0 +1,36 @@
from datetime import timedelta
from django.contrib.auth.models import User
from django.utils import timezone
from ..models import CategorieSpectacle, Salle, Spectacle, Tirage
def create_user(username, is_cof=False, is_buro=False):
user = User.objects.create_user(username=username, password=username)
user.profile.is_cof = is_cof
user.profile.is_buro = is_buro
user.profile.save()
return user
def user_is_cof(user):
return (user is not None) and user.profile.is_cof
def user_is_staff(user):
return (user is not None) and user.profile.is_buro
def create_spectacle(**kwargs):
defaults = {
"title": "Title",
"category": CategorieSpectacle.objects.first(),
"date": (timezone.now() + timedelta(days=7)).date(),
"location": Salle.objects.first(),
"price": 10.0,
"slots": 20,
"tirage": Tirage.objects.first(),
"listing": False,
}
return Spectacle.objects.create(**dict(defaults, **kwargs))

View file

@ -12,7 +12,7 @@ urlpatterns = [
), ),
url(r"^places/(?P<tirage_id>\d+)$", views.places, name="bda-places-attribuees"), url(r"^places/(?P<tirage_id>\d+)$", views.places, name="bda-places-attribuees"),
url(r"^etat-places/(?P<tirage_id>\d+)$", views.etat_places, name="bda-etat-places"), url(r"^etat-places/(?P<tirage_id>\d+)$", views.etat_places, name="bda-etat-places"),
url(r"^tirage/(?P<tirage_id>\d+)$", views.tirage), url(r"^tirage/(?P<tirage_id>\d+)$", views.tirage, name="bda-tirage"),
url( url(
r"^spectacles/(?P<tirage_id>\d+)$", r"^spectacles/(?P<tirage_id>\d+)$",
buro_required(SpectacleListView.as_view()), buro_required(SpectacleListView.as_view()),

View file

@ -68,6 +68,7 @@ INSTALLED_APPS = [
"django.contrib.admin", "django.contrib.admin",
"django.contrib.admindocs", "django.contrib.admindocs",
"bda", "bda",
"petitscours",
"captcha", "captcha",
"django_cas_ng", "django_cas_ng",
"bootstrapform", "bootstrapform",

View file

@ -37,7 +37,7 @@ def show_toolbar(request):
machine physique n'est pas forcément connue, et peut difficilement être machine physique n'est pas forcément connue, et peut difficilement être
mise dans les INTERNAL_IPS. mise dans les INTERNAL_IPS.
""" """
return DEBUG return DEBUG and not request.path.startswith("/admin/")
if not TESTING: if not TESTING:

View file

@ -20,7 +20,6 @@ from gestioncof.urls import (
clubs_patterns, clubs_patterns,
events_patterns, events_patterns,
export_patterns, export_patterns,
petitcours_patterns,
surveys_patterns, surveys_patterns,
) )
@ -34,7 +33,7 @@ urlpatterns = [
# Les exports # Les exports
url(r"^export/", include(export_patterns)), url(r"^export/", include(export_patterns)),
# Les petits cours # Les petits cours
url(r"^petitcours/", include(petitcours_patterns)), url(r"^petitcours/", include("petitscours.urls")),
# Les sondages # Les sondages
url(r"^survey/", include(surveys_patterns)), url(r"^survey/", include(surveys_patterns)),
# Evenements # Evenements

View file

@ -20,7 +20,7 @@ from gestioncof.models import (
SurveyQuestion, SurveyQuestion,
SurveyQuestionAnswer, SurveyQuestionAnswer,
) )
from gestioncof.petits_cours_models import ( from petitscours.models import (
PetitCoursAbility, PetitCoursAbility,
PetitCoursAttribution, PetitCoursAttribution,
PetitCoursAttributionCounter, PetitCoursAttributionCounter,

View file

@ -1,23 +1,55 @@
from django.contrib.auth.decorators import user_passes_test from functools import wraps
from django.contrib.auth.decorators import login_required, user_passes_test
from django.core.exceptions import PermissionDenied
from django.shortcuts import render
def is_cof(user): def cof_required(view_func):
try: """Décorateur qui vérifie que l'utilisateur est connecté et membre du COF.
profile = user.profile
return profile.is_cof - Si l'utilisteur n'est pas connecté, il est redirigé vers la page de
except Exception: connexion
return False - Si l'utilisateur est connecté mais pas membre du COF, il obtient une
page d'erreur lui demandant de s'inscrire au COF
"""
def is_cof(user):
try:
return user.profile.is_cof
except AttributeError:
return False
@wraps(view_func)
def _wrapped_view(request, *args, **kwargs):
if is_cof(request.user):
return view_func(request, *args, **kwargs)
return render(request, "cof-denied.html", status=403)
return login_required(_wrapped_view)
cof_required = user_passes_test(is_cof) def buro_required(view_func):
"""Décorateur qui vérifie que l'utilisateur est connecté et membre du burô.
- Si l'utilisateur n'est pas connecté, il est redirigé vers la page de
connexion
- Si l'utilisateur est connecté mais pas membre du burô, il obtient une
page d'erreur 403 Forbidden
"""
def is_buro(user): def is_buro(user):
try: try:
profile = user.profile return user.profile.is_buro
return profile.is_buro except AttributeError:
except Exception: return False
return False
@wraps(view_func)
def _wrapped_view(request, *args, **kwargs):
if is_buro(request.user):
return view_func(request, *args, **kwargs)
buro_required = user_passes_test(is_buro) return render(request, "buro-denied.html", status=403)
return login_required(_wrapped_view)

View file

@ -14,7 +14,7 @@ from django.contrib.auth.models import User
from django.core.management import call_command from django.core.management import call_command
from gestioncof.management.base import MyBaseCommand from gestioncof.management.base import MyBaseCommand
from gestioncof.petits_cours_models import ( from petitscours.models import (
LEVELS_CHOICES, LEVELS_CHOICES,
PetitCoursAbility, PetitCoursAbility,
PetitCoursAttributionCounter, PetitCoursAttributionCounter,

View file

@ -5,7 +5,7 @@ from django.dispatch import receiver
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from bda.models import Spectacle from bda.models import Spectacle
from gestioncof.petits_cours_models import choices_length from petitscours.models import choices_length
TYPE_COMMENT_FIELD = (("text", _("Texte long")), ("char", _("Texte court"))) TYPE_COMMENT_FIELD = (("text", _("Texte long")), ("char", _("Texte court")))

View file

@ -15,7 +15,7 @@ def messages_on_out_login(request, user, **kwargs):
@receiver(cas_user_authenticated) @receiver(cas_user_authenticated)
def mesagges_on_cas_login(request, user, **kwargs): def messages_on_cas_login(request, user, **kwargs):
msg = _("Connexion à GestioCOF par CAS réussie. Bienvenue {}.").format( msg = _("Connexion à GestioCOF par CAS réussie. Bienvenue {}.").format(
user.get_short_name() user.get_short_name()
) )

View file

@ -0,0 +1,5 @@
{% extends "base_title.html" %}
{% block realcontent %}
<h2>Section réservée au Burô.</h2>
{% endblock %}

View file

@ -1,15 +1,4 @@
import os
from django.conf import settings
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model
from django.core.management import call_command
from gestioncof.petits_cours_models import (
PetitCoursAbility,
PetitCoursAttributionCounter,
PetitCoursDemande,
PetitCoursSubject,
)
User = get_user_model() User = get_user_model()
@ -77,31 +66,3 @@ def create_root(username, attrs=None):
attrs.setdefault("is_staff", True) attrs.setdefault("is_staff", True)
attrs.setdefault("is_superuser", True) attrs.setdefault("is_superuser", True)
return _create_user(username, attrs=attrs) return _create_user(username, attrs=attrs)
def create_petitcours_ability(**kwargs):
if "user" not in kwargs:
kwargs["user"] = create_user()
if "matiere" not in kwargs:
kwargs["matiere"] = create_petitcours_subject()
if "niveau" not in kwargs:
kwargs["niveau"] = "college"
ability = PetitCoursAbility.objects.create(**kwargs)
PetitCoursAttributionCounter.get_uptodate(ability.user, ability.matiere)
return ability
def create_petitcours_demande(**kwargs):
return PetitCoursDemande.objects.create(**kwargs)
def create_petitcours_subject(**kwargs):
return PetitCoursSubject.objects.create(**kwargs)
class PetitCoursTestHelpers:
def require_custommails(self):
data_file = os.path.join(
settings.BASE_DIR, "gestioncof", "management", "data", "custommail.json"
)
call_command("syncmails", data_file, verbosity=0)

View file

@ -1,8 +1,7 @@
from django.conf.urls import url from django.conf.urls import url
from gestioncof import petits_cours_views, views from gestioncof import views
from gestioncof.decorators import buro_required from gestioncof.decorators import buro_required
from gestioncof.petits_cours_views import DemandeDetailView, DemandeListView
export_patterns = [ export_patterns = [
url(r"^members$", views.export_members, name="cof.membres_export"), url(r"^members$", views.export_members, name="cof.membres_export"),
@ -21,40 +20,6 @@ export_patterns = [
url(r"^mega$", views.export_mega, name="cof.mega_export"), url(r"^mega$", views.export_mega, name="cof.mega_export"),
] ]
petitcours_patterns = [
url(
r"^inscription$",
petits_cours_views.inscription,
name="petits-cours-inscription",
),
url(r"^demande$", petits_cours_views.demande, name="petits-cours-demande"),
url(
r"^demande-raw$",
petits_cours_views.demande_raw,
name="petits-cours-demande-raw",
),
url(
r"^demandes$",
buro_required(DemandeListView.as_view()),
name="petits-cours-demandes-list",
),
url(
r"^demandes/(?P<pk>\d+)$",
buro_required(DemandeDetailView.as_view()),
name="petits-cours-demande-details",
),
url(
r"^demandes/(?P<demande_id>\d+)/traitement$",
petits_cours_views.traitement,
name="petits-cours-demande-traitement",
),
url(
r"^demandes/(?P<demande_id>\d+)/retraitement$",
petits_cours_views.retraitement,
name="petits-cours-demande-retraitement",
),
]
surveys_patterns = [ surveys_patterns = [
url( url(
r"^(?P<survey_id>\d+)/status$", r"^(?P<survey_id>\d+)/status$",

View file

@ -1,4 +1,5 @@
import json import json
import random
from datetime import timedelta from datetime import timedelta
from unittest import mock from unittest import mock
@ -8,7 +9,7 @@ from django.contrib.auth.models import AnonymousUser, Permission, User
from django.test import Client from django.test import Client
from django.utils import timezone from django.utils import timezone
from . import OpenKfet, kfet_open from . import OpenKfet
from .consumers import OpenKfetConsumer from .consumers import OpenKfetConsumer
@ -16,10 +17,10 @@ class OpenKfetTest(ChannelTestCase):
"""OpenKfet object unit-tests suite.""" """OpenKfet object unit-tests suite."""
def setUp(self): def setUp(self):
self.kfet_open = OpenKfet() self.kfet_open = OpenKfet(
cache_prefix="test_kfetopen_%s" % random.randrange(2 ** 20)
def tearDown(self): )
self.kfet_open.clear_cache() self.addCleanup(self.kfet_open.clear_cache)
def test_defaults(self): def test_defaults(self):
"""Default values.""" """Default values."""
@ -136,8 +137,14 @@ class OpenKfetViewsTest(ChannelTestCase):
self.c_a = Client() self.c_a = Client()
self.c_a.login(username="admin", password="admin") self.c_a.login(username="admin", password="admin")
def tearDown(self): self.kfet_open = OpenKfet(
kfet_open.clear_cache() cache_prefix="test_kfetopen_%s" % random.randrange(2 ** 20)
)
self.addCleanup(self.kfet_open.clear_cache)
views_patcher = mock.patch("kfet.open.views.kfet_open", self.kfet_open)
views_patcher.start()
self.addCleanup(views_patcher.stop)
def test_door(self): def test_door(self):
"""Edit raw_status.""" """Edit raw_status."""
@ -146,14 +153,14 @@ class OpenKfetViewsTest(ChannelTestCase):
"/k-fet/open/raw_open", {"raw_open": sent, "token": "plop"} "/k-fet/open/raw_open", {"raw_open": sent, "token": "plop"}
) )
self.assertEqual(200, resp.status_code) self.assertEqual(200, resp.status_code)
self.assertEqual(expected, kfet_open.raw_open) self.assertEqual(expected, self.kfet_open.raw_open)
def test_force_close(self): def test_force_close(self):
"""Edit force_close.""" """Edit force_close."""
for sent, expected in [(1, True), (0, False)]: for sent, expected in [(1, True), (0, False)]:
resp = self.c_a.post("/k-fet/open/force_close", {"force_close": sent}) resp = self.c_a.post("/k-fet/open/force_close", {"force_close": sent})
self.assertEqual(200, resp.status_code) self.assertEqual(200, resp.status_code)
self.assertEqual(expected, kfet_open.force_close) self.assertEqual(expected, self.kfet_open.force_close)
def test_force_close_forbidden(self): def test_force_close_forbidden(self):
"""Can't edit force_close without kfet.can_force_close permission.""" """Can't edit force_close without kfet.can_force_close permission."""
@ -236,8 +243,10 @@ class OpenKfetScenarioTest(ChannelTestCase):
self.r_c_ws = WSClient() self.r_c_ws = WSClient()
self.r_c_ws.force_login(self.r) self.r_c_ws.force_login(self.r)
def tearDown(self): self.kfet_open = OpenKfet(
kfet_open.clear_cache() cache_prefix="test_kfetopen_%s" % random.randrange(2 ** 20)
)
self.addCleanup(self.kfet_open.clear_cache)
def ws_connect(self, ws_client): def ws_connect(self, ws_client):
ws_client.send_and_consume( ws_client.send_and_consume(
@ -288,8 +297,8 @@ class OpenKfetScenarioTest(ChannelTestCase):
def test_scenario_2(self): def test_scenario_2(self):
"""Starting falsely closed, clients connect, disable force close.""" """Starting falsely closed, clients connect, disable force close."""
kfet_open.raw_open = True self.kfet_open.raw_open = True
kfet_open.force_close = True self.kfet_open.force_close = True
msg = self.ws_connect(self.c_ws) msg = self.ws_connect(self.c_ws)
self.assertEqual(OpenKfet.CLOSED, msg["status"]) self.assertEqual(OpenKfet.CLOSED, msg["status"])

View file

@ -1,3 +1,6 @@
from decimal import Decimal
from unittest import mock
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model
from django.contrib.auth.models import Permission from django.contrib.auth.models import Permission
from django.contrib.contenttypes.models import ContentType from django.contrib.contenttypes.models import ContentType
@ -5,9 +8,16 @@ from django.test import TestCase
from gestioncof.models import CofProfile from gestioncof.models import CofProfile
from ..models import Account from ..models import Account, Article, ArticleCategory, Checkout, Operation
from .testcases import TestCaseMixin from .testcases import TestCaseMixin
from .utils import create_root, create_team, create_user, get_perms, user_add_perms from .utils import (
create_operation_group,
create_root,
create_team,
create_user,
get_perms,
user_add_perms,
)
User = get_user_model() User = get_user_model()
@ -86,3 +96,80 @@ class PermHelpersTest(TestCaseMixin, TestCase):
map(repr, [self.perm1, self.perm2, self.perm_team]), map(repr, [self.perm1, self.perm2, self.perm_team]),
ordered=False, ordered=False,
) )
class OperationHelpersTest(TestCase):
def test_create_operation_group(self):
operation_group = create_operation_group()
on_acc = Account.objects.get(cofprofile__user__username="user")
checkout = Checkout.objects.get(name="Checkout")
self.assertDictEqual(
operation_group.__dict__,
{
"_checkout_cache": checkout,
"_on_acc_cache": on_acc,
"_state": mock.ANY,
"amount": 0,
"at": mock.ANY,
"checkout_id": checkout.pk,
"comment": "",
"id": mock.ANY,
"is_cof": False,
"on_acc_id": on_acc.pk,
"valid_by_id": None,
},
)
self.assertFalse(operation_group.opes.all())
def test_create_operation_group_with_content(self):
article_category = ArticleCategory.objects.create(name="Category")
article1 = Article.objects.create(
category=article_category, name="Article 1", price=Decimal("2.50")
)
article2 = Article.objects.create(
category=article_category, name="Article 2", price=Decimal("4.00")
)
operation_group = create_operation_group(
content=[
{
"type": Operation.PURCHASE,
"amount": Decimal("-3.50"),
"article": article1,
"article_nb": 2,
},
{"type": Operation.PURCHASE, "article": article2, "article_nb": 2},
{"type": Operation.PURCHASE, "article": article2},
{"type": Operation.DEPOSIT, "amount": Decimal("10.00")},
{"type": Operation.WITHDRAW, "amount": Decimal("-1.00")},
{"type": Operation.EDIT, "amount": Decimal("7.00")},
]
)
self.assertEqual(operation_group.amount, Decimal("0.50"))
operation_list = list(operation_group.opes.all())
# Passed args: with purchase, article, article_nb, amount
self.assertEqual(operation_list[0].type, Operation.PURCHASE)
self.assertEqual(operation_list[0].article, article1)
self.assertEqual(operation_list[0].article_nb, 2)
self.assertEqual(operation_list[0].amount, Decimal("-3.50"))
# Passed args: with purchase, article, article_nb; without amount
self.assertEqual(operation_list[1].type, Operation.PURCHASE)
self.assertEqual(operation_list[1].article, article2)
self.assertEqual(operation_list[1].article_nb, 2)
self.assertEqual(operation_list[1].amount, Decimal("-8.00"))
# Passed args: with purchase, article; without article_nb, amount
self.assertEqual(operation_list[2].type, Operation.PURCHASE)
self.assertEqual(operation_list[2].article, article2)
self.assertEqual(operation_list[2].article_nb, 1)
self.assertEqual(operation_list[2].amount, Decimal("-4.00"))
# Passed args: with deposit, amount
self.assertEqual(operation_list[3].type, Operation.DEPOSIT)
self.assertEqual(operation_list[3].amount, Decimal("10.00"))
# Passed args: with withdraw, amount
self.assertEqual(operation_list[4].type, Operation.WITHDRAW)
self.assertEqual(operation_list[4].amount, Decimal("-1.00"))
# Passed args: with edit, amount
self.assertEqual(operation_list[5].type, Operation.EDIT)
self.assertEqual(operation_list[5].amount, Decimal("7.00"))

View file

@ -28,7 +28,16 @@ from ..models import (
TransferGroup, TransferGroup,
) )
from .testcases import ViewTestCaseMixin from .testcases import ViewTestCaseMixin
from .utils import create_team, create_user, get_perms, user_add_perms from .utils import (
create_checkout,
create_checkout_statement,
create_inventory_article,
create_operation_group,
create_team,
create_user,
get_perms,
user_add_perms,
)
class AccountListViewTests(ViewTestCaseMixin, TestCase): class AccountListViewTests(ViewTestCaseMixin, TestCase):
@ -2952,6 +2961,21 @@ class KPsulPerformOperationsViewTests(ViewTestCaseMixin, TestCase):
class KPsulCancelOperationsViewTests(ViewTestCaseMixin, TestCase): class KPsulCancelOperationsViewTests(ViewTestCaseMixin, TestCase):
"""
Test cases for kpsul_cancel_operations view.
To test valid requests, one should use '_assertResponseOk(response)' to get
hints about failure reasons, if any.
At least one test per operation type should test the complete response and
behavior (HTTP, WebSocket, object updates, and object creations)
Other tests of the same operation type can only assert the specific
behavior differences.
For invalid requests, response errors should be tested.
"""
url_name = "kfet.kpsul.cancel_operations" url_name = "kfet.kpsul.cancel_operations"
url_expected = "/k-fet/k-psul/cancel_operations" url_expected = "/k-fet/k-psul/cancel_operations"
@ -2960,8 +2984,790 @@ class KPsulCancelOperationsViewTests(ViewTestCaseMixin, TestCase):
auth_user = "team" auth_user = "team"
auth_forbidden = [None, "user"] auth_forbidden = [None, "user"]
def test_ok(self): with_liq = True
pass
def setUp(self):
super(KPsulCancelOperationsViewTests, self).setUp()
self.checkout = create_checkout(balance=Decimal("100.00"))
# An Article, price=2.5, stock=20
self.article = Article.objects.create(
category=ArticleCategory.objects.create(name="Category"),
name="Article",
price=Decimal("2.5"),
stock=20,
)
# An Account, trigramme=000, balance=50
# Do not assume user is cof, nor not cof.
self.account = self.accounts["user"]
self.account.balance = Decimal("50.00")
self.account.save()
# Mock consumer of K-Psul websocket to catch what we're sending
kpsul_consumer_patcher = mock.patch("kfet.consumers.KPsul")
self.kpsul_consumer_mock = kpsul_consumer_patcher.start()
self.addCleanup(kpsul_consumer_patcher.stop)
def _assertResponseOk(self, response):
"""
Asserts that status code of 'response' is 200, and returns the
deserialized content of the JSONResponse.
In case status code is not 200, it prints the content of "errors" of
the response.
"""
json_data = json.loads(getattr(response, "content", b"{}").decode("utf-8"))
try:
self.assertEqual(response.status_code, 200)
except AssertionError as exc:
msg = "Expected response is 200, got {}. Errors: {}".format(
response.status_code, json_data.get("errors")
)
raise AssertionError(msg) from exc
return json_data
def test_invalid_operation_not_int(self):
data = {"operations[]": ["a"]}
resp = self.client.post(self.url, data)
self.assertEqual(resp.status_code, 400)
json_data = json.loads(resp.content.decode("utf-8"))
self.assertEqual(json_data["errors"], {})
def test_invalid_operation_not_exist(self):
data = {"operations[]": ["1000"]}
resp = self.client.post(self.url, data)
self.assertEqual(resp.status_code, 400)
json_data = json.loads(resp.content.decode("utf-8"))
self.assertEqual(json_data["errors"], {"opes_notexisting": [1000]})
@mock.patch("django.utils.timezone.now")
def test_purchase(self, now_mock):
now_mock.return_value = self.now
group = create_operation_group(
on_acc=self.account,
checkout=self.checkout,
content=[
{"type": Operation.PURCHASE, "article": self.article, "article_nb": 2}
],
)
operation = group.opes.get()
now_mock.return_value += timedelta(seconds=15)
data = {"operations[]": [str(operation.pk)]}
resp = self.client.post(self.url, data)
json_data = self._assertResponseOk(resp)
group = OperationGroup.objects.get()
self.assertDictEqual(
group.__dict__,
{
"_state": mock.ANY,
"amount": Decimal("0.00"),
"at": mock.ANY,
"checkout_id": self.checkout.pk,
"comment": "",
"id": mock.ANY,
"is_cof": False,
"on_acc_id": self.account.pk,
"valid_by_id": None,
},
)
operation = Operation.objects.get()
self.assertDictEqual(
operation.__dict__,
{
"_state": mock.ANY,
"addcost_amount": None,
"addcost_for_id": None,
"amount": Decimal("-5.00"),
"article_id": self.article.pk,
"article_nb": 2,
"canceled_at": self.now + timedelta(seconds=15),
"canceled_by_id": None,
"group_id": group.pk,
"id": mock.ANY,
"type": Operation.PURCHASE,
},
)
self.assertDictEqual(
json_data, {"canceled": [operation.pk], "errors": {}, "warnings": {}}
)
self.account.refresh_from_db()
self.assertEqual(self.account.balance, Decimal("55.00"))
self.article.refresh_from_db()
self.assertEqual(self.article.stock, 22)
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("100.00"))
self.kpsul_consumer_mock.group_send.assert_called_with(
"kfet.kpsul",
{
"opegroups": [
{
"cancellation": True,
"id": group.pk,
"amount": Decimal("0.00"),
"is_cof": False,
}
],
"opes": [
{
"cancellation": True,
"id": operation.pk,
"canceled_by__trigramme": None,
"canceled_at": self.now + timedelta(seconds=15),
}
],
"checkouts": [],
"articles": [{"id": self.article.pk, "stock": 22}],
},
)
def test_purchase_with_addcost(self):
# TODO(AD): L'état de la balance du compte destinataire de la majoration ne
# devrait pas empêcher l'annulation d'une opération.
addcost_user = create_user(
"addcost", "ADD", account_attrs={"balance": Decimal("10.00")}
)
addcost_account = addcost_user.profile.account_kfet
group = create_operation_group(
on_acc=self.account,
checkout=self.checkout,
content=[
{
"type": Operation.PURCHASE,
"article": self.article,
"article_nb": 2,
"amount": Decimal("-6.00"),
"addcost_amount": Decimal("1.00"),
"addcost_for": addcost_account,
}
],
)
operation = group.opes.get()
data = {"operations[]": [str(operation.pk)]}
resp = self.client.post(self.url, data)
self._assertResponseOk(resp)
self.account.refresh_from_db()
self.assertEqual(self.account.balance, Decimal("56.00"))
addcost_account.refresh_from_db()
self.assertEqual(addcost_account.balance, Decimal("9.00"))
def test_purchase_cash(self):
group = create_operation_group(
on_acc=self.accounts["liq"],
checkout=self.checkout,
content=[
{
"type": Operation.PURCHASE,
"article": self.article,
"article_nb": 2,
"amount": Decimal("-5.00"),
}
],
)
operation = group.opes.get()
data = {"operations[]": [str(operation.pk)]}
resp = self.client.post(self.url, data)
self._assertResponseOk(resp)
self.assertEqual(self.accounts["liq"].balance, Decimal("0.00"))
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("95.00"))
ws_data_checkouts = self.kpsul_consumer_mock.group_send.call_args[0][1][
"checkouts"
]
self.assertListEqual(
ws_data_checkouts, [{"id": self.checkout.pk, "balance": Decimal("95.00")}]
)
def test_purchase_cash_with_addcost(self):
# TODO(AD): L'état de la balance du compte destinataire de la majoration ne
# devrait pas empêcher l'annulation d'une opération.
addcost_user = create_user(
"addcost", "ADD", account_attrs={"balance": Decimal("10.00")}
)
addcost_account = addcost_user.profile.account_kfet
group = create_operation_group(
on_acc=self.accounts["liq"],
checkout=self.checkout,
content=[
{
"type": Operation.PURCHASE,
"article": self.article,
"article_nb": 2,
"amount": Decimal("-6.00"),
"addcost_amount": Decimal("1.00"),
"addcost_for": addcost_account,
}
],
)
operation = group.opes.get()
data = {"operations[]": [str(operation.pk)]}
resp = self.client.post(self.url, data)
self._assertResponseOk(resp)
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("94.00"))
addcost_account.refresh_from_db()
self.assertEqual(addcost_account.balance, Decimal("9.00"))
ws_data_checkouts = self.kpsul_consumer_mock.group_send.call_args[0][1][
"checkouts"
]
self.assertListEqual(
ws_data_checkouts, [{"id": self.checkout.pk, "balance": Decimal("94.00")}]
)
@mock.patch("django.utils.timezone.now")
def test_deposit(self, now_mock):
now_mock.return_value = self.now
group = create_operation_group(
on_acc=self.account,
checkout=self.checkout,
content=[{"type": Operation.DEPOSIT, "amount": Decimal("10.75")}],
)
operation = group.opes.get()
now_mock.return_value += timedelta(seconds=15)
data = {"operations[]": [str(operation.pk)]}
resp = self.client.post(self.url, data)
json_data = self._assertResponseOk(resp)
group = OperationGroup.objects.get()
self.assertDictEqual(
group.__dict__,
{
"_state": mock.ANY,
"amount": Decimal("0.00"),
"at": mock.ANY,
"checkout_id": self.checkout.pk,
"comment": "",
"id": mock.ANY,
"is_cof": False,
"on_acc_id": self.account.pk,
"valid_by_id": None,
},
)
operation = Operation.objects.get()
self.assertDictEqual(
operation.__dict__,
{
"_state": mock.ANY,
"addcost_amount": None,
"addcost_for_id": None,
"amount": Decimal("10.75"),
"article_id": None,
"article_nb": None,
"canceled_at": self.now + timedelta(seconds=15),
"canceled_by_id": None,
"group_id": group.pk,
"id": mock.ANY,
"type": Operation.DEPOSIT,
},
)
self.assertDictEqual(
json_data, {"canceled": [operation.pk], "errors": {}, "warnings": {}}
)
self.account.refresh_from_db()
self.assertEqual(self.account.balance, Decimal("39.25"))
self.article.refresh_from_db()
self.assertEqual(self.article.stock, 20)
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("89.25"))
self.kpsul_consumer_mock.group_send.assert_called_with(
"kfet.kpsul",
{
"opegroups": [
{
"cancellation": True,
"id": group.pk,
"amount": Decimal("0.00"),
"is_cof": False,
}
],
"opes": [
{
"cancellation": True,
"id": operation.pk,
"canceled_by__trigramme": None,
"canceled_at": self.now + timedelta(seconds=15),
}
],
"checkouts": [{"id": self.checkout.pk, "balance": Decimal("89.25")}],
"articles": [],
},
)
@mock.patch("django.utils.timezone.now")
def test_withdraw(self, now_mock):
now_mock.return_value = self.now
group = create_operation_group(
on_acc=self.account,
checkout=self.checkout,
content=[{"type": Operation.WITHDRAW, "amount": Decimal("-10.75")}],
)
operation = group.opes.get()
now_mock.return_value += timedelta(seconds=15)
data = {"operations[]": [str(operation.pk)]}
resp = self.client.post(self.url, data)
json_data = self._assertResponseOk(resp)
group = OperationGroup.objects.get()
self.assertDictEqual(
group.__dict__,
{
"_state": mock.ANY,
"amount": Decimal("0.00"),
"at": mock.ANY,
"checkout_id": self.checkout.pk,
"comment": "",
"id": mock.ANY,
"is_cof": False,
"on_acc_id": self.account.pk,
"valid_by_id": None,
},
)
operation = Operation.objects.get()
self.assertDictEqual(
operation.__dict__,
{
"_state": mock.ANY,
"addcost_amount": None,
"addcost_for_id": None,
"amount": Decimal("-10.75"),
"article_id": None,
"article_nb": None,
"canceled_at": self.now + timedelta(seconds=15),
"canceled_by_id": None,
"group_id": group.pk,
"id": mock.ANY,
"type": Operation.WITHDRAW,
},
)
self.assertDictEqual(
json_data, {"canceled": [operation.pk], "errors": {}, "warnings": {}}
)
self.account.refresh_from_db()
self.assertEqual(self.account.balance, Decimal("60.75"))
self.article.refresh_from_db()
self.assertEqual(self.article.stock, 20)
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("110.75"))
self.kpsul_consumer_mock.group_send.assert_called_with(
"kfet.kpsul",
{
"opegroups": [
{
"cancellation": True,
"id": group.pk,
"amount": Decimal("0.00"),
"is_cof": False,
}
],
"opes": [
{
"cancellation": True,
"id": operation.pk,
"canceled_by__trigramme": None,
"canceled_at": self.now + timedelta(seconds=15),
}
],
"checkouts": [{"id": self.checkout.pk, "balance": Decimal("110.75")}],
"articles": [],
},
)
@mock.patch("django.utils.timezone.now")
def test_edit(self, now_mock):
now_mock.return_value = self.now
group = create_operation_group(
on_acc=self.account,
checkout=self.checkout,
content=[{"type": Operation.EDIT, "amount": Decimal("-10.75")}],
)
operation = group.opes.get()
now_mock.return_value += timedelta(seconds=15)
data = {"operations[]": [str(operation.pk)]}
resp = self.client.post(self.url, data)
json_data = self._assertResponseOk(resp)
group = OperationGroup.objects.get()
self.assertDictEqual(
group.__dict__,
{
"_state": mock.ANY,
"amount": Decimal("0.00"),
"at": mock.ANY,
"checkout_id": self.checkout.pk,
"comment": "",
"id": mock.ANY,
"is_cof": False,
"on_acc_id": self.account.pk,
"valid_by_id": None,
},
)
operation = Operation.objects.get()
self.assertDictEqual(
operation.__dict__,
{
"_state": mock.ANY,
"addcost_amount": None,
"addcost_for_id": None,
"amount": Decimal("-10.75"),
"article_id": None,
"article_nb": None,
"canceled_at": self.now + timedelta(seconds=15),
"canceled_by_id": None,
"group_id": group.pk,
"id": mock.ANY,
"type": Operation.EDIT,
},
)
self.assertDictEqual(
json_data, {"canceled": [operation.pk], "errors": {}, "warnings": {}}
)
self.account.refresh_from_db()
self.assertEqual(self.account.balance, Decimal("60.75"))
self.article.refresh_from_db()
self.assertEqual(self.article.stock, 20)
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("100.00"))
self.kpsul_consumer_mock.group_send.assert_called_with(
"kfet.kpsul",
{
"opegroups": [
{
"cancellation": True,
"id": group.pk,
"amount": Decimal("0.00"),
"is_cof": False,
}
],
"opes": [
{
"cancellation": True,
"id": operation.pk,
"canceled_by__trigramme": None,
"canceled_at": self.now + timedelta(seconds=15),
}
],
"checkouts": [],
"articles": [],
},
)
@mock.patch("django.utils.timezone.now")
def test_old_operations(self, now_mock):
kfet_config.set(cancel_duration=timedelta(minutes=10))
user_add_perms(self.users["team"], ["kfet.cancel_old_operations"])
now_mock.return_value = self.now
group = create_operation_group(
at=self.now,
on_acc=self.account,
checkout=self.checkout,
content=[{"type": Operation.WITHDRAW, "amount": Decimal("-10.75")}],
)
operation = group.opes.get()
now_mock.return_value += timedelta(minutes=10, seconds=1)
data = {"operations[]": [str(operation.pk)]}
resp = self.client.post(self.url, data)
json_data = self._assertResponseOk(resp)
self.assertEqual(len(json_data["canceled"]), 1)
@mock.patch("django.utils.timezone.now")
def test_invalid_old_operations_requires_perm(self, now_mock):
kfet_config.set(cancel_duration=timedelta(minutes=10))
now_mock.return_value = self.now
group = create_operation_group(
at=self.now,
on_acc=self.account,
checkout=self.checkout,
content=[{"type": Operation.WITHDRAW, "amount": Decimal("-10.75")}],
)
operation = group.opes.get()
now_mock.return_value += timedelta(minutes=10, seconds=1)
data = {"operations[]": [str(operation.pk)]}
resp = self.client.post(self.url, data)
self.assertEqual(resp.status_code, 403)
json_data = json.loads(resp.content.decode("utf-8"))
self.assertEqual(
json_data["errors"],
{"missing_perms": ["Annuler des commandes non récentes"]},
)
def test_already_canceled(self):
group = create_operation_group(
on_acc=self.account,
checkout=self.checkout,
content=[
{
"type": Operation.WITHDRAW,
"amount": Decimal("-10.75"),
"canceled_at": timezone.now(),
}
],
)
operation = group.opes.get()
data = {"operations[]": [str(operation.pk)]}
resp = self.client.post(self.url, data)
json_data = self._assertResponseOk(resp)
self.assertDictEqual(
json_data["warnings"], {"already_canceled": [operation.pk]}
)
self.account.refresh_from_db()
self.assertEqual(self.account.balance, Decimal("50.00"))
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("100.00"))
@mock.patch("django.utils.timezone.now")
def test_checkout_before_last_statement(self, now_mock):
now_mock.return_value = self.now
group = create_operation_group(
at=self.now,
on_acc=self.account,
checkout=self.checkout,
content=[{"type": Operation.WITHDRAW, "amount": Decimal("-10.75")}],
)
operation = group.opes.get()
now_mock.return_value += timedelta(seconds=30)
create_checkout_statement(checkout=self.checkout)
now_mock.return_value += timedelta(seconds=30)
data = {"operations[]": [str(operation.pk)]}
resp = self.client.post(self.url, data)
json_data = self._assertResponseOk(resp)
self.assertEqual(len(json_data["canceled"]), 1)
self.account.refresh_from_db()
self.assertEqual(self.account.balance, Decimal("60.75"))
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("100.00"))
@mock.patch("django.utils.timezone.now")
def test_article_before_last_inventory(self, now_mock):
now_mock.return_value = self.now
group = create_operation_group(
at=self.now,
on_acc=self.account,
checkout=self.checkout,
content=[
{"type": Operation.PURCHASE, "article": self.article, "article_nb": 2}
],
)
operation = group.opes.get()
now_mock.return_value += timedelta(seconds=30)
create_inventory_article(article=self.article)
now_mock.return_value += timedelta(seconds=30)
data = {"operations[]": [str(operation.pk)]}
resp = self.client.post(self.url, data)
json_data = self._assertResponseOk(resp)
self.assertEqual(len(json_data["canceled"]), 1)
self.account.refresh_from_db()
self.assertEqual(self.account.balance, Decimal("55.00"))
self.article.refresh_from_db()
self.assertEqual(self.article.stock, 20)
def test_negative(self):
kfet_config.set(overdraft_amount=Decimal("40.00"))
user_add_perms(self.users["team"], ["kfet.perform_negative_operations"])
self.account.balance = Decimal("-20.00")
self.account.save()
group = create_operation_group(
on_acc=self.account,
checkout=self.checkout,
content=[{"type": Operation.DEPOSIT, "amount": Decimal("10.75")}],
)
operation = group.opes.get()
data = {"operations[]": [str(operation.pk)]}
resp = self.client.post(self.url, data)
json_data = self._assertResponseOk(resp)
self.assertEqual(len(json_data["canceled"]), 1)
self.account.refresh_from_db()
self.assertEqual(self.account.balance, Decimal("-30.75"))
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("89.25"))
def test_invalid_negative_above_thresholds(self):
kfet_config.set(overdraft_amount=Decimal("5.00"))
user_add_perms(self.users["team"], ["kfet.perform_negative_operations"])
self.account.balance = Decimal("-20.00")
self.account.save()
group = create_operation_group(
on_acc=self.account,
checkout=self.checkout,
content=[{"type": Operation.DEPOSIT, "amount": Decimal("10.75")}],
)
operation = group.opes.get()
data = {"operations[]": [str(operation.pk)]}
resp = self.client.post(self.url, data)
self.assertEqual(resp.status_code, 403)
json_data = json.loads(resp.content.decode("utf-8"))
self.assertEqual(json_data["errors"], {"negative": [self.account.trigramme]})
def test_invalid_negative_requires_perms(self):
kfet_config.set(overdraft_amount=Decimal("40.00"))
self.account.balance = Decimal("-20.00")
self.account.save()
group = create_operation_group(
on_acc=self.account,
checkout=self.checkout,
content=[{"type": Operation.DEPOSIT, "amount": Decimal("10.75")}],
)
operation = group.opes.get()
data = {"operations[]": [str(operation.pk)]}
resp = self.client.post(self.url, data)
self.assertEqual(resp.status_code, 403)
json_data = json.loads(resp.content.decode("utf-8"))
self.assertEqual(
json_data["errors"],
{"missing_perms": ["Enregistrer des commandes en négatif"]},
)
def test_partial_0(self):
group = create_operation_group(
on_acc=self.account,
checkout=self.checkout,
content=[
{"type": Operation.PURCHASE, "article": self.article, "article_nb": 2},
{"type": Operation.DEPOSIT, "amount": Decimal("10.75")},
{"type": Operation.EDIT, "amount": Decimal("-6.00")},
{
"type": Operation.WITHDRAW,
"amount": Decimal("-10.75"),
"canceled_at": timezone.now(),
},
],
)
operation1 = group.opes.get(type=Operation.PURCHASE)
operation2 = group.opes.get(type=Operation.EDIT)
operation3 = group.opes.get(type=Operation.WITHDRAW)
data = {
"operations[]": [str(operation1.pk), str(operation2.pk), str(operation3.pk)]
}
resp = self.client.post(self.url, data)
json_data = self._assertResponseOk(resp)
group.refresh_from_db()
self.assertEqual(group.amount, Decimal("10.75"))
self.assertEqual(group.opes.exclude(canceled_at=None).count(), 3)
self.assertDictEqual(
json_data,
{
"canceled": [operation1.pk, operation2.pk],
"warnings": {"already_canceled": [operation3.pk]},
"errors": {},
},
)
self.account.refresh_from_db()
self.assertEqual(self.account.balance, Decimal("61.00"))
self.article.refresh_from_db()
self.assertEqual(self.article.stock, 22)
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("100.00"))
def test_multi_0(self):
group1 = create_operation_group(
on_acc=self.account,
checkout=self.checkout,
content=[
{"type": Operation.PURCHASE, "article": self.article, "article_nb": 2},
{"type": Operation.DEPOSIT, "amount": Decimal("10.75")},
{"type": Operation.EDIT, "amount": Decimal("-6.00")},
],
)
operation11 = group1.opes.get(type=Operation.PURCHASE)
group2 = create_operation_group(
on_acc=self.account,
checkout=self.checkout,
content=[
{"type": Operation.PURCHASE, "article": self.article, "article_nb": 5},
{"type": Operation.DEPOSIT, "amount": Decimal("3.00")},
],
)
operation21 = group2.opes.get(type=Operation.PURCHASE)
operation22 = group2.opes.get(type=Operation.DEPOSIT)
data = {
"operations[]": [
str(operation11.pk),
str(operation21.pk),
str(operation22.pk),
]
}
resp = self.client.post(self.url, data)
json_data = self._assertResponseOk(resp)
group1.refresh_from_db()
self.assertEqual(group1.amount, Decimal("4.75"))
self.assertEqual(group1.opes.exclude(canceled_at=None).count(), 1)
group2.refresh_from_db()
self.assertEqual(group2.amount, Decimal(0))
self.assertEqual(group2.opes.exclude(canceled_at=None).count(), 2)
self.assertEqual(len(json_data["canceled"]), 3)
self.account.refresh_from_db()
self.assertEqual(self.account.balance, Decimal("64.50"))
self.article.refresh_from_db()
self.assertEqual(self.article.stock, 27)
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("97.00"))
class KPsulArticlesData(ViewTestCaseMixin, TestCase): class KPsulArticlesData(ViewTestCaseMixin, TestCase):

View file

@ -1,7 +1,21 @@
from datetime import timedelta
from decimal import Decimal
from django.contrib.auth import get_user_model from django.contrib.auth import get_user_model
from django.contrib.auth.models import Permission from django.contrib.auth.models import Permission
from django.utils import timezone
from ..models import Account from ..models import (
Account,
Article,
ArticleCategory,
Checkout,
CheckoutStatement,
Inventory,
InventoryArticle,
Operation,
OperationGroup,
)
User = get_user_model() User = get_user_model()
@ -184,3 +198,180 @@ def user_add_perms(user, perms_labels):
# it to avoid using of the previous permissions cache. # it to avoid using of the previous permissions cache.
# https://docs.djangoproject.com/en/dev/topics/auth/default/#permission-caching # https://docs.djangoproject.com/en/dev/topics/auth/default/#permission-caching
return User.objects.get(pk=user.pk) return User.objects.get(pk=user.pk)
def create_checkout(**kwargs):
"""
Factory to create a checkout.
See defaults for unpassed arguments in code below.
"""
if "created_by" not in kwargs or "created_by_id" not in kwargs:
try:
team_account = Account.objects.get(cofprofile__user__username="team")
except Account.DoesNotExist:
team_account = create_team().profile.account_kfet
kwargs["created_by"] = team_account
kwargs.setdefault("name", "Checkout")
kwargs.setdefault("valid_from", timezone.now() - timedelta(days=14))
kwargs.setdefault("valid_to", timezone.now() - timedelta(days=14))
return Checkout.objects.create(**kwargs)
def create_operation_group(content=None, **kwargs):
"""
Factory to create an OperationGroup and a set of related Operation.
It aims to get objects for testing purposes with minimal setup, and
preserving consistency.
For this, it uses, and creates if necessary, default objects for unpassed
arguments.
Args:
content: list of dict
Describe set of Operation to create along the OperationGroup.
Each item is passed to the Operation factory.
kwargs:
Used to control OperationGroup creation.
"""
if content is None:
content = []
# Prepare OperationGroup creation.
# Set 'checkout' for OperationGroup if unpassed.
if "checkout" not in kwargs and "checkout_id" not in kwargs:
try:
checkout = Checkout.objects.get(name="Checkout")
except Checkout.DoesNotExist:
checkout = create_checkout()
kwargs["checkout"] = checkout
# Set 'on_acc' for OperationGroup if unpassed.
if "on_acc" not in kwargs and "on_acc_id" not in kwargs:
try:
on_acc = Account.objects.get(cofprofile__user__username="user")
except Account.DoesNotExist:
on_acc = create_user().profile.account_kfet
kwargs["on_acc"] = on_acc
# Set 'is_cof' for OperationGroup if unpassed.
if "is_cof" not in kwargs:
# Use current is_cof status of 'on_acc'.
kwargs["is_cof"] = kwargs["on_acc"].cofprofile.is_cof
# Create OperationGroup.
group = OperationGroup.objects.create(**kwargs)
# We can now create objects referencing this OperationGroup.
# Process set of related Operation.
if content:
# Create them.
operation_list = []
for operation_kwargs in content:
operation = create_operation(group=group, **operation_kwargs)
operation_list.append(operation)
# Update OperationGroup accordingly, for consistency.
for operation in operation_list:
if not operation.canceled_at:
group.amount += operation.amount
group.save()
return group
def create_operation(**kwargs):
"""
Factory to create an Operation for testing purposes.
If you give a 'group' (OperationGroup), it won't update it, you have do
this "manually". Prefer using OperationGroup factory to get a consistent
group with operations.
"""
if "group" not in kwargs and "group_id" not in kwargs:
# To get a consistent OperationGroup (amount...) for the operation
# in-creation, prefer using create_operation_group factory with
# 'content'.
kwargs["group"] = create_operation_group()
if "type" not in kwargs:
raise RuntimeError("Can't create an Operation without 'type'.")
# Apply defaults for purchase
if kwargs["type"] == Operation.PURCHASE:
if "article" not in kwargs:
raise NotImplementedError(
"One could write a create_article factory. Right now, you must"
"pass an 'article'."
)
# Unpassed 'article_nb' defaults to 1.
kwargs.setdefault("article_nb", 1)
# Unpassed 'amount' will use current article price and quantity.
if "amount" not in kwargs:
if "addcost_for" in kwargs or "addcost_amount" in kwargs:
raise NotImplementedError(
"One could handle the case where 'amount' is missing and "
"addcost applies. Right now, please pass an 'amount'."
)
kwargs["amount"] = -kwargs["article"].price * kwargs["article_nb"]
return Operation.objects.create(**kwargs)
def create_checkout_statement(**kwargs):
if "checkout" not in kwargs:
kwargs["checkout"] = create_checkout()
if "by" not in kwargs:
try:
team_account = Account.objects.get(cofprofile__user__username="team")
except Account.DoesNotExist:
team_account = create_team().profile.account_kfet
kwargs["by"] = team_account
kwargs.setdefault("balance_new", kwargs["checkout"].balance)
kwargs.setdefault("balance_old", kwargs["checkout"].balance)
kwargs.setdefault("amount_taken", Decimal(0))
return CheckoutStatement.objects.create(**kwargs)
def create_article(**kwargs):
kwargs.setdefault("name", "Article")
kwargs.setdefault("price", Decimal("2.50"))
kwargs.setdefault("stock", 20)
if "category" not in kwargs:
kwargs["category"] = create_article_category()
return Article.objects.create(**kwargs)
def create_article_category(**kwargs):
kwargs.setdefault("name", "Category")
return ArticleCategory.objects.create(**kwargs)
def create_inventory(**kwargs):
if "by" not in kwargs:
try:
team_account = Account.objects.get(cofprofile__user__username="team")
except Account.DoesNotExist:
team_account = create_team().profile.account_kfet
kwargs["by"] = team_account
return Inventory.objects.create(**kwargs)
def create_inventory_article(**kwargs):
if "inventory" not in kwargs:
kwargs["inventory"] = create_inventory()
if "article" not in kwargs:
kwargs["article"] = create_article()
kwargs.setdefault("stock_old", kwargs["article"].stock)
kwargs.setdefault("stock_new", kwargs["article"].stock)
return InventoryArticle.objects.create(**kwargs)

View file

@ -1301,13 +1301,27 @@ def kpsul_cancel_operations(request):
stock=F("stock") + to_articles_stocks[article] stock=F("stock") + to_articles_stocks[article]
) )
# Need refresh from db cause we used update on querysets.
# Sort objects by pk to get deterministic responses.
opegroups_pk = [opegroup.pk for opegroup in to_groups_amounts]
opegroups = (
OperationGroup.objects.values("id", "amount", "is_cof")
.filter(pk__in=opegroups_pk)
.order_by("pk")
)
opes = sorted(opes)
checkouts_pk = [checkout.pk for checkout in to_checkouts_balances]
checkouts = (
Checkout.objects.values("id", "balance")
.filter(pk__in=checkouts_pk)
.order_by("pk")
)
articles_pk = [article.pk for articles in to_articles_stocks]
articles = Article.objects.values("id", "stock").filter(pk__in=articles_pk)
# Websocket data # Websocket data
websocket_data = {"opegroups": [], "opes": [], "checkouts": [], "articles": []} websocket_data = {"opegroups": [], "opes": [], "checkouts": [], "articles": []}
# Need refresh from db cause we used update on querysets
opegroups_pk = [opegroup.pk for opegroup in to_groups_amounts]
opegroups = OperationGroup.objects.values("id", "amount", "is_cof").filter(
pk__in=opegroups_pk
)
for opegroup in opegroups: for opegroup in opegroups:
websocket_data["opegroups"].append( websocket_data["opegroups"].append(
{ {
@ -1327,16 +1341,10 @@ def kpsul_cancel_operations(request):
"canceled_at": canceled_at, "canceled_at": canceled_at,
} }
) )
# Need refresh from db cause we used update on querysets
checkouts_pk = [checkout.pk for checkout in to_checkouts_balances]
checkouts = Checkout.objects.values("id", "balance").filter(pk__in=checkouts_pk)
for checkout in checkouts: for checkout in checkouts:
websocket_data["checkouts"].append( websocket_data["checkouts"].append(
{"id": checkout["id"], "balance": checkout["balance"]} {"id": checkout["id"], "balance": checkout["balance"]}
) )
# Need refresh from db cause we used update on querysets
articles_pk = [article.pk for articles in to_articles_stocks]
articles = Article.objects.values("id", "stock").filter(pk__in=articles_pk)
for article in articles: for article in articles:
websocket_data["articles"].append( websocket_data["articles"].append(
{"id": article["id"], "stock": article["stock"]} {"id": article["id"], "stock": article["stock"]}

0
petitscours/__init__.py Normal file
View file

View file

@ -4,7 +4,7 @@ from django.contrib.auth.models import User
from django.forms import ModelForm from django.forms import ModelForm
from django.forms.models import BaseInlineFormSet, inlineformset_factory from django.forms.models import BaseInlineFormSet, inlineformset_factory
from gestioncof.petits_cours_models import PetitCoursAbility, PetitCoursDemande from petitscours.models import PetitCoursAbility, PetitCoursDemande
class BaseMatieresFormSet(BaseInlineFormSet): class BaseMatieresFormSet(BaseInlineFormSet):

View file

@ -3,6 +3,7 @@ from functools import reduce
from django.contrib.auth.models import User from django.contrib.auth.models import User
from django.db import models from django.db import models
from django.db.models import Min from django.db.models import Min
from django.utils.functional import cached_property
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
@ -16,6 +17,7 @@ LEVELS_CHOICES = (
("prepa1styear", _("Prépa 1ère année / L1")), ("prepa1styear", _("Prépa 1ère année / L1")),
("prepa2ndyear", _("Prépa 2ème année / L2")), ("prepa2ndyear", _("Prépa 2ème année / L2")),
("licence3", _("Licence 3")), ("licence3", _("Licence 3")),
("master1", _("Master (1ère ou 2ème année)")),
("other", _("Autre (préciser dans les commentaires)")), ("other", _("Autre (préciser dans les commentaires)")),
) )
@ -27,6 +29,7 @@ class PetitCoursSubject(models.Model):
) )
class Meta: class Meta:
app_label = "gestioncof"
verbose_name = "Matière de petits cours" verbose_name = "Matière de petits cours"
verbose_name_plural = "Matières des petits cours" verbose_name_plural = "Matières des petits cours"
@ -45,6 +48,7 @@ class PetitCoursAbility(models.Model):
agrege = models.BooleanField(_("Agrégé"), default=False) agrege = models.BooleanField(_("Agrégé"), default=False)
class Meta: class Meta:
app_label = "gestioncof"
verbose_name = "Compétence petits cours" verbose_name = "Compétence petits cours"
verbose_name_plural = "Compétences des petits cours" verbose_name_plural = "Compétences des petits cours"
@ -53,6 +57,12 @@ class PetitCoursAbility(models.Model):
self.user.username, self.matiere, self.niveau self.user.username, self.matiere, self.niveau
) )
@cached_property
def counter(self) -> int:
"""Le compteur d'attribution associé au professeur pour cette matière."""
return PetitCoursAttributionCounter.get_uptodate(self.user, self.matiere).count
class PetitCoursDemande(models.Model): class PetitCoursDemande(models.Model):
name = models.CharField(_("Nom/prénom"), max_length=200) name = models.CharField(_("Nom/prénom"), max_length=200)
@ -126,7 +136,44 @@ class PetitCoursDemande(models.Model):
candidates = candidates.order_by("?").select_related().all() candidates = candidates.order_by("?").select_related().all()
yield (matiere, candidates) yield (matiere, candidates)
def get_proposals(self, *, max_candidates: int = None, redo: bool = False):
"""Calcule une proposition de profs pour la demande.
Args:
max_candidates (optionnel; défaut: `None`): Le nombre maximum de
candidats à proposer par demande. Si `None` ou non spécifié,
il n'y a pas de limite.
redo (optionel; défaut: `False`): Détermine si on re-calcule les
propositions pour la demande (les professeurs à qui on a déjà
proposé cette demande sont exclus).
Returns:
proposals: Le dictionnaire qui associe à chaque matière la liste
des professeurs proposés. Les matières pour lesquelles aucun
professeur n'est disponible ne sont pas présentes dans
`proposals`.
unsatisfied: La liste des matières pour lesquelles aucun
professeur n'est disponible.
"""
proposals = {}
unsatisfied = []
for matiere, candidates in self.get_candidates(redo=redo):
if not candidates:
unsatisfied.append(matiere)
else:
proposals[matiere] = matiere_proposals = []
candidates = sorted(candidates, key=lambda c: c.counter)
candidates = candidates[:max_candidates]
for candidate in candidates[:max_candidates]:
matiere_proposals.append(candidate.user)
return proposals, unsatisfied
class Meta: class Meta:
app_label = "gestioncof"
verbose_name = "Demande de petits cours" verbose_name = "Demande de petits cours"
verbose_name_plural = "Demandes de petits cours" verbose_name_plural = "Demandes de petits cours"
@ -147,6 +194,7 @@ class PetitCoursAttribution(models.Model):
selected = models.BooleanField(_("Sélectionné par le demandeur"), default=False) selected = models.BooleanField(_("Sélectionné par le demandeur"), default=False)
class Meta: class Meta:
app_label = "gestioncof"
verbose_name = "Attribution de petits cours" verbose_name = "Attribution de petits cours"
verbose_name_plural = "Attributions de petits cours" verbose_name_plural = "Attributions de petits cours"
@ -182,6 +230,7 @@ class PetitCoursAttributionCounter(models.Model):
return counter return counter
class Meta: class Meta:
app_label = "gestioncof"
verbose_name = "Compteur d'attribution de petits cours" verbose_name = "Compteur d'attribution de petits cours"
verbose_name_plural = "Compteurs d'attributions de petits cours" verbose_name_plural = "Compteurs d'attributions de petits cours"

View file

@ -1,11 +1,11 @@
{% extends "base_title_petitscours.html" %} {% extends "petitscours/base_title.html" %}
{% load staticfiles %} {% load staticfiles %}
{% block page_size %}col-sm-8{% endblock %} {% block page_size %}col-sm-8{% endblock %}
{% block realcontent %} {% block realcontent %}
<h2>Demande de petits cours</h2> <h2>Demande de petits cours</h2>
{% include "details_demande_petit_cours_infos.html" %} {% include "petitscours/details_demande_infos.html" %}
<hr /> <hr />
<table class="table table-striped"> <table class="table table-striped">
<tr><td><strong>Traitée</strong></td><td> <img src="{% if demande.traitee %}{% static "images/yes.png" %}{% else %}{% static "images/no.png" %}{% endif %}" /></td></tr> <tr><td><strong>Traitée</strong></td><td> <img src="{% if demande.traitee %}{% static "images/yes.png" %}{% else %}{% static "images/no.png" %}{% endif %}" /></td></tr>

View file

@ -1,4 +1,4 @@
{% extends "base_title_petitscours.html" %} {% extends "petitscours/base_title.html" %}
{% load staticfiles %} {% load staticfiles %}
{% block realcontent %} {% block realcontent %}

View file

@ -94,7 +94,7 @@ var django = {
<form class="form-horizontal petit-cours_form" id="bda_form" method="post" action="{% url 'petits-cours-inscription' %}"> <form class="form-horizontal petit-cours_form" id="bda_form" method="post" action="{% url 'petits-cours-inscription' %}">
{% csrf_token %} {% csrf_token %}
<div class="table-top" style="margin-left:0px; margin-right:0px; font-size: 1.25em; font-weight: bold; color: #DE826B;"><input type="checkbox" name="receive_proposals" {% if receive_proposals %}checked="checked"{% endif %} /> Recevoir des propositions de petits cours</div> <div class="table-top" style="margin-left:0px; margin-right:0px; font-size: 1.25em; font-weight: bold; color: #DE826B;"><input type="checkbox" name="receive_proposals" {% if receive_proposals %}checked="checked"{% endif %} /> Recevoir des propositions de petits cours</div>
{% include "inscription-petit-cours-formset.html" %} {% include "petitscours/inscription_formset.html" %}
<div class="inscription-bottom"> <div class="inscription-bottom">
<input type="button" class="btn btn-default pull-right" value="Ajouter une autre matière" id="add_more" /> <input type="button" class="btn btn-default pull-right" value="Ajouter une autre matière" id="add_more" />
<script> <script>

View file

@ -1,8 +1,8 @@
{% extends "base_title_petitscours.html" %} {% extends "petitscours/base_title.html" %}
{% block realcontent %} {% block realcontent %}
<h2>Traitement de la demande de petits cours {{ demande.id }}</h2> <h2>Traitement de la demande de petits cours {{ demande.id }}</h2>
{% include "details_demande_petit_cours_infos.html" %} {% include "petitscours/details_demande_infos.html" %}
<hr /> <hr />
{% if errors %} {% if errors %}
<div class="error"> <div class="error">

View file

@ -1,9 +1,9 @@
{% extends "base_title_petitscours.html" %} {% extends "petitscours/base_title.html" %}
{% load staticfiles %} {% load staticfiles %}
{% block realcontent %} {% block realcontent %}
<h2>Traitement de la demande de petits cours {{ demande.id }}</h2> <h2>Traitement de la demande de petits cours {{ demande.id }}</h2>
{% include "details_demande_petit_cours_infos.html" %} {% include "petitscours/details_demande_infos.html" %}
<hr /> <hr />
<div class="error"> <div class="error">
Attention: demande de petits cours spécifiant le niveau "Autre niveau": choisissez les candidats correspondant aux remarques de la demande. S'il y a moins de 3 candidats adaptés, ne mettre que ceux qui conviennent, pas besoin de faire du bourrage :) Attention: demande de petits cours spécifiant le niveau "Autre niveau": choisissez les candidats correspondant aux remarques de la demande. S'il y a moins de 3 candidats adaptés, ne mettre que ceux qui conviennent, pas besoin de faire du bourrage :)

View file

@ -1,4 +1,4 @@
{% extends "base_title_petitscours.html" %} {% extends "petitscours/base_title.html" %}
{% block realcontent %} {% block realcontent %}
<h2>Traitement de la demande de petits cours {{ demande.id }}</h2> <h2>Traitement de la demande de petits cours {{ demande.id }}</h2>

View file

View file

@ -0,0 +1,39 @@
import os
from django.conf import settings
from django.core.management import call_command
from petitscours.models import (
PetitCoursAbility,
PetitCoursAttributionCounter,
PetitCoursDemande,
PetitCoursSubject,
)
def create_petitcours_ability(**kwargs):
if "user" not in kwargs:
kwargs["user"] = create_user()
if "matiere" not in kwargs:
kwargs["matiere"] = create_petitcours_subject()
if "niveau" not in kwargs:
kwargs["niveau"] = "college"
ability = PetitCoursAbility.objects.create(**kwargs)
PetitCoursAttributionCounter.get_uptodate(ability.user, ability.matiere)
return ability
def create_petitcours_demande(**kwargs):
return PetitCoursDemande.objects.create(**kwargs)
def create_petitcours_subject(**kwargs):
return PetitCoursSubject.objects.create(**kwargs)
class PetitCoursTestHelpers:
def require_custommails(self):
data_file = os.path.join(
settings.BASE_DIR, "gestioncof", "management", "data", "custommail.json"
)
call_command("syncmails", data_file, verbosity=0)

37
petitscours/urls.py Normal file
View file

@ -0,0 +1,37 @@
from django.conf.urls import url
from gestioncof.decorators import buro_required
from petitscours import views
from petitscours.views import DemandeDetailView, DemandeListView
urlpatterns = [
url(r"^inscription$", views.inscription, name="petits-cours-inscription"),
url(r"^demande$", views.demande, name="petits-cours-demande"),
url(
r"^demande-raw$",
views.demande,
kwargs={"raw": True},
name="petits-cours-demande-raw",
),
url(
r"^demandes$",
buro_required(DemandeListView.as_view()),
name="petits-cours-demandes-list",
),
url(
r"^demandes/(?P<pk>\d+)$",
buro_required(DemandeDetailView.as_view()),
name="petits-cours-demande-details",
),
url(
r"^demandes/(?P<demande_id>\d+)/traitement$",
views.traitement,
name="petits-cours-demande-traitement",
),
url(
r"^demandes/(?P<demande_id>\d+)/retraitement$",
views.traitement,
kwargs={"redo": True},
name="petits-cours-demande-retraitement",
),
]

View file

@ -14,8 +14,8 @@ from django.views.generic import DetailView, ListView
from gestioncof.decorators import buro_required from gestioncof.decorators import buro_required
from gestioncof.models import CofProfile from gestioncof.models import CofProfile
from gestioncof.petits_cours_forms import DemandeForm, MatieresFormSet from petitscours.forms import DemandeForm, MatieresFormSet
from gestioncof.petits_cours_models import ( from petitscours.models import (
PetitCoursAbility, PetitCoursAbility,
PetitCoursAttribution, PetitCoursAttribution,
PetitCoursAttributionCounter, PetitCoursAttributionCounter,
@ -27,7 +27,7 @@ class DemandeListView(ListView):
queryset = PetitCoursDemande.objects.prefetch_related("matieres").order_by( queryset = PetitCoursDemande.objects.prefetch_related("matieres").order_by(
"traitee", "-id" "traitee", "-id"
) )
template_name = "petits_cours_demandes_list.html" template_name = "petitscours/demande_list.html"
paginate_by = 20 paginate_by = 20
@ -36,7 +36,7 @@ class DemandeDetailView(DetailView):
queryset = PetitCoursDemande.objects.prefetch_related( queryset = PetitCoursDemande.objects.prefetch_related(
"petitcoursattribution_set", "matieres" "petitcoursattribution_set", "matieres"
) )
template_name = "gestioncof/details_demande_petit_cours.html" template_name = "petitscours/demande_detail.html"
context_object_name = "demande" context_object_name = "demande"
def get_context_data(self, **kwargs): def get_context_data(self, **kwargs):
@ -53,64 +53,27 @@ def traitement(request, demande_id, redo=False):
return _traitement_other(request, demande, redo) return _traitement_other(request, demande, redo)
if request.method == "POST": if request.method == "POST":
return _traitement_post(request, demande) return _traitement_post(request, demande)
proposals = {} proposals, unsatisfied = demande.get_proposals(redo=redo, max_candidates=3)
proposed_for = {} return _finalize_traitement(request, demande, proposals, unsatisfied, redo)
unsatisfied = []
attribdata = {}
for matiere, candidates in demande.get_candidates(redo):
if candidates:
tuples = []
for candidate in candidates:
user = candidate.user
tuples.append(
(
candidate,
PetitCoursAttributionCounter.get_uptodate(user, matiere),
)
)
tuples = sorted(tuples, key=lambda c: c[1].count)
candidates, _ = zip(*tuples)
candidates = candidates[0 : min(3, len(candidates))]
attribdata[matiere.id] = []
proposals[matiere] = []
for candidate in candidates:
user = candidate.user
proposals[matiere].append(user)
attribdata[matiere.id].append(user.id)
if user not in proposed_for:
proposed_for[user] = [matiere]
else:
proposed_for[user].append(matiere)
else:
unsatisfied.append(matiere)
return _finalize_traitement(
request, demande, proposals, proposed_for, unsatisfied, attribdata, redo
)
@buro_required
def retraitement(request, demande_id):
return traitement(request, demande_id, redo=True)
def _finalize_traitement( def _finalize_traitement(
request, request, demande, proposals, unsatisfied, redo=False, errors=None
demande,
proposals,
proposed_for,
unsatisfied,
attribdata,
redo=False,
errors=None,
): ):
proposals = proposals.items() attribdata = [
proposed_for = proposed_for.items() (matiere.id, [user.id for user in users])
attribdata = list(attribdata.items()) for matiere, users in proposals.items()
]
proposed_for = {}
for matiere, users in proposals.items():
for user in users:
proposed_for.setdefault(user, []).append(matiere)
proposed_mails = _generate_eleve_email(demande, proposed_for) proposed_mails = _generate_eleve_email(demande, proposed_for)
mainmail = render_custom_mail( mainmail = render_custom_mail(
"petits-cours-mail-demandeur", "petits-cours-mail-demandeur",
{ {
"proposals": proposals, "proposals": proposals.items(),
"unsatisfied": unsatisfied, "unsatisfied": unsatisfied,
"extra": '<textarea name="extra" ' "extra": '<textarea name="extra" '
'style="width:99%; height: 90px;">' 'style="width:99%; height: 90px;">'
@ -122,12 +85,12 @@ def _finalize_traitement(
messages.error(request, error) messages.error(request, error)
return render( return render(
request, request,
"gestioncof/traitement_demande_petit_cours.html", "petitscours/traitement_demande.html",
{ {
"demande": demande, "demande": demande,
"unsatisfied": unsatisfied, "unsatisfied": unsatisfied,
"proposals": proposals, "proposals": proposals.items(),
"proposed_for": proposed_for, "proposed_for": proposed_for.items(),
"proposed_mails": proposed_mails, "proposed_mails": proposed_mails,
"mainmail": mainmail, "mainmail": mainmail,
"attribdata": json.dumps(attribdata), "attribdata": json.dumps(attribdata),
@ -144,7 +107,7 @@ def _generate_eleve_email(demande, proposed_for):
"petit-cours-mail-eleve", {"demande": demande, "matieres": matieres} "petit-cours-mail-eleve", {"demande": demande, "matieres": matieres}
), ),
) )
for user, matieres in proposed_for for user, matieres in proposed_for.items()
] ]
@ -152,15 +115,12 @@ def _traitement_other_preparing(request, demande):
redo = "redo" in request.POST redo = "redo" in request.POST
unsatisfied = [] unsatisfied = []
proposals = {} proposals = {}
proposed_for = {}
attribdata = {}
errors = [] errors = []
for matiere, candidates in demande.get_candidates(redo): for matiere, candidates in demande.get_candidates(redo):
if candidates: if candidates:
candidates = dict( candidates = dict(
[(candidate.user.id, candidate.user) for candidate in candidates] [(candidate.user.id, candidate.user) for candidate in candidates]
) )
attribdata[matiere.id] = []
proposals[matiere] = [] proposals[matiere] = []
for choice_id in range(min(3, len(candidates))): for choice_id in range(min(3, len(candidates))):
choice = int( choice = int(
@ -183,11 +143,6 @@ def _traitement_other_preparing(request, demande):
) )
continue continue
proposals[matiere].append(user) proposals[matiere].append(user)
attribdata[matiere.id].append(user.id)
if user not in proposed_for:
proposed_for[user] = [matiere]
else:
proposed_for[user].append(matiere)
if not proposals[matiere]: if not proposals[matiere]:
errors.append("Aucune proposition pour {!s}".format(matiere)) errors.append("Aucune proposition pour {!s}".format(matiere))
elif len(proposals[matiere]) < 3: elif len(proposals[matiere]) < 3:
@ -200,15 +155,7 @@ def _traitement_other_preparing(request, demande):
) )
else: else:
unsatisfied.append(matiere) unsatisfied.append(matiere)
return _finalize_traitement( return _finalize_traitement(request, demande, proposals, unsatisfied, errors=errors)
request,
demande,
proposals,
proposed_for,
unsatisfied,
attribdata,
errors=errors,
)
def _traitement_other(request, demande, redo): def _traitement_other(request, demande, redo):
@ -217,45 +164,14 @@ def _traitement_other(request, demande, redo):
return _traitement_other_preparing(request, demande) return _traitement_other_preparing(request, demande)
else: else:
return _traitement_post(request, demande) return _traitement_post(request, demande)
proposals = {} proposals, unsatisfied = demande.get_proposals(redo=redo)
proposed_for = {}
unsatisfied = []
attribdata = {}
for matiere, candidates in demande.get_candidates(redo):
if candidates:
tuples = []
for candidate in candidates:
user = candidate.user
tuples.append(
(
candidate,
PetitCoursAttributionCounter.get_uptodate(user, matiere),
)
)
tuples = sorted(tuples, key=lambda c: c[1].count)
candidates, _ = zip(*tuples)
attribdata[matiere.id] = []
proposals[matiere] = []
for candidate in candidates:
user = candidate.user
proposals[matiere].append(user)
attribdata[matiere.id].append(user.id)
if user not in proposed_for:
proposed_for[user] = [matiere]
else:
proposed_for[user].append(matiere)
else:
unsatisfied.append(matiere)
proposals = proposals.items()
proposed_for = proposed_for.items()
return render( return render(
request, request,
"gestioncof/traitement_demande_petit_cours_autre_niveau.html", "petitscours/traitement_demande_autre_niveau.html",
{ {
"demande": demande, "demande": demande,
"unsatisfied": unsatisfied, "unsatisfied": unsatisfied,
"proposals": proposals, "proposals": proposals.items(),
"proposed_for": proposed_for,
}, },
) )
@ -280,12 +196,10 @@ def _traitement_post(request, demande):
proposed_for[user] = [matiere] proposed_for[user] = [matiere]
else: else:
proposed_for[user].append(matiere) proposed_for[user].append(matiere)
proposals_list = proposals.items()
proposed_for = proposed_for.items()
proposed_mails = _generate_eleve_email(demande, proposed_for) proposed_mails = _generate_eleve_email(demande, proposed_for)
mainmail_object, mainmail_body = render_custom_mail( mainmail_object, mainmail_body = render_custom_mail(
"petits-cours-mail-demandeur", "petits-cours-mail-demandeur",
{"proposals": proposals_list, "unsatisfied": unsatisfied, "extra": extra}, {"proposals": proposals.items(), "unsatisfied": unsatisfied, "extra": extra},
) )
frommail = settings.MAIL_DATA["petits_cours"]["FROM"] frommail = settings.MAIL_DATA["petits_cours"]["FROM"]
bccaddress = settings.MAIL_DATA["petits_cours"]["BCC"] bccaddress = settings.MAIL_DATA["petits_cours"]["BCC"]
@ -314,8 +228,8 @@ def _traitement_post(request, demande):
connection = mail.get_connection(fail_silently=False) connection = mail.get_connection(fail_silently=False)
connection.send_messages(mails_to_send) connection.send_messages(mails_to_send)
with transaction.atomic(): with transaction.atomic():
for matiere in proposals: for matiere, users in proposals.items():
for rank, user in enumerate(proposals[matiere]): for rank, user in enumerate(users):
# TODO(AD): Prefer PetitCoursAttributionCounter.get_uptodate() # TODO(AD): Prefer PetitCoursAttributionCounter.get_uptodate()
counter = PetitCoursAttributionCounter.objects.get( counter = PetitCoursAttributionCounter.objects.get(
user=user, matiere=matiere user=user, matiere=matiere
@ -332,7 +246,7 @@ def _traitement_post(request, demande):
demande.save() demande.save()
return render( return render(
request, request,
"gestioncof/traitement_demande_petit_cours_success.html", "petitscours/traitement_demande_success.html",
{"demande": demande, "redo": redo}, {"demande": demande, "redo": redo},
) )
@ -362,7 +276,7 @@ def inscription(request):
formset = MatieresFormSet(instance=request.user) formset = MatieresFormSet(instance=request.user)
return render( return render(
request, request,
"inscription-petit-cours.html", "petitscours/inscription.html",
{ {
"formset": formset, "formset": formset,
"success": success, "success": success,
@ -373,7 +287,7 @@ def inscription(request):
@csrf_exempt @csrf_exempt
def demande(request): def demande(request, *, raw: bool = False):
success = False success = False
if request.method == "POST": if request.method == "POST":
form = DemandeForm(request.POST) form = DemandeForm(request.POST)
@ -382,21 +296,7 @@ def demande(request):
success = True success = True
else: else:
form = DemandeForm() form = DemandeForm()
return render( template_name = "petitscours/demande.html"
request, "demande-petit-cours.html", {"form": form, "success": success} if raw:
) template_name = "petitscours/demande_raw.html"
return render(request, template_name, {"form": form, "success": success})
@csrf_exempt
def demande_raw(request):
success = False
if request.method == "POST":
form = DemandeForm(request.POST)
if form.is_valid():
form.save()
success = True
else:
form = DemandeForm()
return render(
request, "demande-petit-cours-raw.html", {"form": form, "success": success}
)

View file

@ -3,7 +3,7 @@ Django==1.11.*
django-autocomplete-light==3.1.3 django-autocomplete-light==3.1.3
django-autoslug==1.9.3 django-autoslug==1.9.3
django-cas-ng==3.5.7 django-cas-ng==3.5.7
django-djconfig==0.5.3 django-djconfig==0.8.0
django-recaptcha==1.4.0 django-recaptcha==1.4.0
django-redis-cache==1.8.1 django-redis-cache==1.8.1
icalendar icalendar

View file

@ -4,6 +4,7 @@ source =
cof cof
gestioncof gestioncof
kfet kfet
petitscours
shared shared
utils utils
omit = omit =
@ -33,7 +34,7 @@ default_section = THIRDPARTY
force_grid_wrap = 0 force_grid_wrap = 0
include_trailing_comma = true include_trailing_comma = true
known_django = django known_django = django
known_first_party = bda,cof,gestioncof,kfet,shared,utils known_first_party = bda,cof,gestioncof,kfet,petitscours,shared,utils
line_length = 88 line_length = 88
multi_line_output = 3 multi_line_output = 3
not_skip = __init__.py not_skip = __init__.py

View file

@ -330,6 +330,7 @@ class ViewTestCaseMixin(TestCaseMixin):
kwargs=url_conf.get("kwargs", {}), kwargs=url_conf.get("kwargs", {}),
) )
for url_conf in self.urls_conf for url_conf in self.urls_conf
if url_conf["name"] is not None
] ]
@property @property