Thubrecht/daphne #815

Merged
thubrecht merged 39 commits from thubrecht/daphne into master 2024-07-30 18:40:56 +02:00
89 changed files with 753 additions and 376 deletions

View file

@ -43,13 +43,21 @@ variables:
# Keep this disabled for now, as it may kill GitLab...
# coverage: '/TOTAL.*\s(\d+\.\d+)\%$/'
kfettest:
stage: test
extends: .test_template
variables:
DJANGO_SETTINGS_MODULE: "gestioasso.settings.cof_prod"
script:
- coverage run manage.py test kfet
coftest:
stage: test
extends: .test_template
variables:
DJANGO_SETTINGS_MODULE: "gestioasso.settings.cof_prod"
script:
- coverage run manage.py test gestioncof bda kfet petitscours shared --parallel
- coverage run manage.py test gestioncof bda petitscours shared --parallel
bdstest:
stage: test

View file

@ -23,6 +23,9 @@ adhérents ni des cotisations.
## TODO Prod
- Lancer `python manage.py update_translation_fields` après la migration
- Mettre à jour les units systemd `daphne.service` et `worker.service`
- Créer un compte hCaptcha (https://www.hcaptcha.com/), au COF, et remplacer les secrets associés
## Version ??? - ??/??/????
@ -65,6 +68,8 @@ adhérents ni des cotisations.
- Fixe un problème de rendu causé par l'agrandissement du menu
- Mise à jour vers Channels 3.x et Django 3.2
## Version 0.12 - 17/06/2022
### K-Fêt

View file

@ -0,0 +1,23 @@
# Generated by Django 3.2.13 on 2022-06-30 10:45
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("bda", "0018_auto_20201021_1818"),
]
operations = [
migrations.AlterUniqueTogether(
name="choixspectacle",
unique_together=set(),
),
migrations.AddConstraint(
model_name="choixspectacle",
constraint=models.UniqueConstraint(
fields=("participant", "spectacle"), name="unique_participation"
),
),
]

View file

@ -253,7 +253,11 @@ class ChoixSpectacle(models.Model):
class Meta:
ordering = ("priority",)
unique_together = (("participant", "spectacle"),)
constraints = [
models.UniqueConstraint(
fields=["participant", "spectacle"], name="unique_participation"
)
]
verbose_name = "voeu"
verbose_name_plural = "voeux"

View file

@ -1,5 +1,5 @@
{% extends "base_title.html" %}
{% load staticfiles %}
{% load static %}
{% block extra_head %}
<link type="text/css" rel="stylesheet" href="{% static "bda/css/bda.css" %}" />

View file

@ -1,5 +1,5 @@
{% extends "base_title.html" %}
{% load staticfiles %}
{% load static %}
{% block realcontent %}
<h2>État des inscriptions BdA</h2>

View file

@ -1,5 +1,5 @@
{% extends "base_title.html" %}
{% load staticfiles %}
{% load static %}
{% block extra_head %}
<script type="text/javascript" src="{% static 'vendor/jquery/jquery-ui.min.js' %}" ></script>

View file

@ -1,5 +1,5 @@
{% extends "base_title.html" %}
{% load staticfiles %}
{% load static %}
{% block realcontent %}
<h2>{{ spectacle }}</h2>

View file

@ -1,5 +1,5 @@
{% extends "base_title.html" %}
{% load staticfiles %}
{% load static %}
{%block realcontent %}

View file

@ -1,5 +1,5 @@
{% extends "base_title.html" %}
{% load staticfiles %}
{% load static %}
{% block realcontent %}
<h2>Inscription à une revente</h2>

View file

@ -1,5 +1,5 @@
{% extends "base_title.html" %}
{% load staticfiles %}
{% load static %}
{% block realcontent %}

View file

@ -1,5 +1,5 @@
{% extends "base_title.html" %}
{% load staticfiles %}
{% load static %}
{% block realcontent %}

View file

@ -1,5 +1,5 @@
{% extends "base_title.html" %}
{% load staticfiles%}
{% load static %}
{% block realcontent %}
<h2>Inscriptions pour BdA-Revente</h2>

View file

@ -1,5 +1,5 @@
{% extends "base_title.html" %}
{% load staticfiles %}
{% load static %}
{% block realcontent %}

View file

@ -1,5 +1,5 @@
{% extends "base_title.html" %}
{% load staticfiles %}
{% load static %}
{% block extra_head %}
<link type="text/css" rel="stylesheet" href="{% static "bda/css/bda.css" %}" />

View file

@ -1,74 +1,80 @@
from django.conf.urls import url
from django.urls import re_path
from bda import views
from bda.views import SpectacleListView
from gestioncof.decorators import buro_required
urlpatterns = [
url(
re_path(
r"^inscription/(?P<tirage_id>\d+)$",
views.inscription,
name="bda-tirage-inscription",
),
url(r"^places/(?P<tirage_id>\d+)$", views.places, name="bda-places-attribuees"),
url(r"^etat-places/(?P<tirage_id>\d+)$", views.etat_places, name="bda-etat-places"),
url(r"^tirage/(?P<tirage_id>\d+)$", views.tirage, name="bda-tirage"),
url(
re_path(r"^places/(?P<tirage_id>\d+)$", views.places, name="bda-places-attribuees"),
re_path(
r"^etat-places/(?P<tirage_id>\d+)$", views.etat_places, name="bda-etat-places"
),
re_path(r"^tirage/(?P<tirage_id>\d+)$", views.tirage, name="bda-tirage"),
re_path(
r"^spectacles/(?P<tirage_id>\d+)$",
buro_required(SpectacleListView.as_view()),
name="bda-liste-spectacles",
),
url(
re_path(
r"^spectacles/(?P<tirage_id>\d+)/(?P<spectacle_id>\d+)$",
views.spectacle,
name="bda-spectacle",
),
url(
re_path(
r"^spectacles/unpaid/(?P<tirage_id>\d+)$",
views.UnpaidParticipants.as_view(),
name="bda-unpaid",
),
url(
re_path(
r"^spectacles/autocomplete$",
views.spectacle_autocomplete,
name="bda-spectacle-autocomplete",
),
url(
re_path(
r"^participants/autocomplete$",
views.participant_autocomplete,
name="bda-participant-autocomplete",
),
# Urls BdA-Revente
url(
re_path(
r"^revente/(?P<tirage_id>\d+)/manage$",
views.revente_manage,
name="bda-revente-manage",
),
url(
re_path(
r"^revente/(?P<tirage_id>\d+)/subscribe$",
views.revente_subscribe,
name="bda-revente-subscribe",
),
url(
re_path(
r"^revente/(?P<tirage_id>\d+)/tirages$",
views.revente_tirages,
name="bda-revente-tirages",
),
url(
re_path(
r"^revente/(?P<spectacle_id>\d+)/buy$",
views.revente_buy,
name="bda-revente-buy",
),
url(
re_path(
r"^revente/(?P<revente_id>\d+)/confirm$",
views.revente_confirm,
name="bda-revente-confirm",
),
url(
re_path(
r"^revente/(?P<tirage_id>\d+)/shotgun$",
views.revente_shotgun,
name="bda-revente-shotgun",
),
url(r"^mails-rappel/(?P<spectacle_id>\d+)$", views.send_rappel, name="bda-rappels"),
url(r"^catalogue/(?P<request_type>[a-z]+)$", views.catalogue, name="bda-catalogue"),
re_path(
r"^mails-rappel/(?P<spectacle_id>\d+)$", views.send_rappel, name="bda-rappels"
),
re_path(
r"^catalogue/(?P<request_type>[a-z]+)$", views.catalogue, name="bda-catalogue"
),
]

View file

@ -1 +0,0 @@
default_app_config = "bds.apps.BdsConfig"

View file

@ -1,5 +1,4 @@
from django import apps as global_apps
from django.apps import AppConfig
from django.apps import AppConfig, apps as global_apps
from django.db.models import Q
from django.db.models.signals import post_migrate

View file

@ -1,4 +1,4 @@
{% load staticfiles %}
{% load static %}
{% load bulma_utils %}
<!DOCTYPE html>

View file

@ -31,7 +31,7 @@ class TestHomeView(TestCase):
user, backend="django.contrib.auth.backends.ModelBackend"
)
resp = self.client.get(reverse("bds:home"))
self.assertEquals(resp.status_code, 200)
self.assertEqual(resp.status_code, 200)
class TestRegistrationView(TestCase):
@ -48,12 +48,12 @@ class TestRegistrationView(TestCase):
# Logged-in but unprivileged GET
client.force_login(user, backend="django.contrib.auth.backends.ModelBackend")
resp = client.get(url)
self.assertEquals(resp.status_code, 403)
self.assertEqual(resp.status_code, 403)
# Burô user GET
give_bds_buro_permissions(user)
resp = client.get(url)
self.assertEquals(resp.status_code, 200)
self.assertEqual(resp.status_code, 200)
@mock.patch("gestioncof.signals.messages")
def test_get(self, mock_messages):
@ -68,9 +68,9 @@ class TestRegistrationView(TestCase):
# Logged-in but unprivileged GET
client.force_login(user, backend="django.contrib.auth.backends.ModelBackend")
resp = client.get(url)
self.assertEquals(resp.status_code, 403)
self.assertEqual(resp.status_code, 403)
# Burô user GET
give_bds_buro_permissions(user)
resp = client.get(url)
self.assertEquals(resp.status_code, 200)
self.assertEqual(resp.status_code, 200)

View file

@ -0,0 +1,63 @@
# Generated by Django 3.2.13 on 2022-06-30 10:39
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("events", "0004_unique_constraints"),
]
operations = [
migrations.AlterUniqueTogether(
name="extrafield",
unique_together=set(),
),
migrations.AlterUniqueTogether(
name="extrafieldcontent",
unique_together=set(),
),
migrations.AlterUniqueTogether(
name="option",
unique_together=set(),
),
migrations.AlterUniqueTogether(
name="optionchoice",
unique_together=set(),
),
migrations.AlterUniqueTogether(
name="registration",
unique_together=set(),
),
migrations.AddConstraint(
model_name="extrafield",
constraint=models.UniqueConstraint(
fields=("event", "name"), name="unique_extra_field"
),
),
migrations.AddConstraint(
model_name="extrafieldcontent",
constraint=models.UniqueConstraint(
fields=("field", "registration"), name="unique_extra_field_content"
),
),
migrations.AddConstraint(
model_name="option",
constraint=models.UniqueConstraint(
fields=("event", "name"), name="unique_event_option"
),
),
migrations.AddConstraint(
model_name="optionchoice",
constraint=models.UniqueConstraint(
fields=("option", "choice"), name="unique_option_choice"
),
),
migrations.AddConstraint(
model_name="registration",
constraint=models.UniqueConstraint(
fields=("event", "user"), name="unique_registration"
),
),
]

View file

@ -72,9 +72,13 @@ class Option(models.Model):
multi_choices = models.BooleanField(_("choix multiples"), default=False)
class Meta:
constraints = [
models.UniqueConstraint(
fields=["event", "name"], name="unique_event_option"
)
]
verbose_name = _("option d'événement")
verbose_name_plural = _("options d'événement")
unique_together = [["event", "name"]]
def __str__(self):
return self.name
@ -87,9 +91,13 @@ class OptionChoice(models.Model):
choice = models.CharField(_("choix"), max_length=200)
class Meta:
constraints = [
models.UniqueConstraint(
fields=["option", "choice"], name="unique_option_choice"
)
]
verbose_name = _("choix d'option d'événement")
verbose_name_plural = _("choix d'option d'événement")
unique_together = [["option", "choice"]]
def __str__(self):
return self.choice
@ -118,7 +126,9 @@ class ExtraField(models.Model):
field_type = models.CharField(_("type de champ"), max_length=9, choices=FIELD_TYPE)
class Meta:
unique_together = [["event", "name"]]
constraints = [
models.UniqueConstraint(fields=["event", "name"], name="unique_extra_field")
]
class ExtraFieldContent(models.Model):
@ -137,9 +147,13 @@ class ExtraFieldContent(models.Model):
)
class Meta:
constraints = [
models.UniqueConstraint(
fields=["field", "registration"], name="unique_extra_field_content"
)
]
verbose_name = _("contenu d'un champ événement supplémentaire")
verbose_name_plural = _("contenus d'un champ événement supplémentaire")
unique_together = [["field", "registration"]]
def __str__(self):
max_length = 50
@ -163,9 +177,13 @@ class Registration(models.Model):
)
class Meta:
constraints = [
models.UniqueConstraint(
fields=["event", "user"], name="unique_registration"
)
]
verbose_name = _("inscription à un événement")
verbose_name_plural = _("inscriptions à un événement")
unique_together = [["event", "user"]]
def __str__(self):
return "inscription de {} à {}".format(self.user, self.event)

View file

@ -1,8 +1,15 @@
"""
ASGI entrypoint. Configures Django and then runs the application
defined in the ASGI_APPLICATION setting.
"""
import os
from channels.asgi import get_channel_layer
import django
from channels.routing import get_default_application
if "DJANGO_SETTINGS_MODULE" not in os.environ:
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "gestioasso.settings")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "gestioasso.settings.local")
channel_layer = get_channel_layer()
django.setup()
application = get_default_application()

View file

@ -1,3 +1,20 @@
from channels.routing import include
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
from django.urls import path
routing = [include("kfet.routing.routing", path=r"^/ws/k-fet")]
from kfet.routing import KFRouter
application = ProtocolTypeRouter(
{
# WebSocket chat handler
"websocket": AuthMiddlewareStack(
URLRouter(
[
path("ws/k-fet", KFRouter),
]
)
),
"http": get_asgi_application(),
}
)

View file

@ -85,7 +85,6 @@ MIDDLEWARE = (
+ MIDDLEWARE
+ [
"djconfig.middleware.DjConfigMiddleware",
"wagtail.core.middleware.SiteMiddleware",
"wagtail.contrib.redirects.middleware.RedirectMiddleware",
]
)
@ -109,6 +108,8 @@ MEDIA_URL = "/gestion/media/"
CORS_ORIGIN_WHITELIST = ("bda.ens.fr", "www.bda.ens.fr" "cof.ens.fr", "www.cof.ens.fr")
ASGI_APPLICATION = "gestioasso.routing.application"
# ---
# Auth-related stuff
# ---
@ -147,7 +148,7 @@ CACHES = {
CHANNEL_LAYERS = {
"default": {
"BACKEND": "asgi_redis.RedisChannelLayer",
"BACKEND": "shared.channels.ChannelLayer",
"CONFIG": {
"hosts": [
(
@ -160,11 +161,9 @@ CHANNEL_LAYERS = {
)
]
},
"ROUTING": "gestioasso.routing.routing",
}
}
# ---
# reCAPTCHA settings
# https://github.com/praekelt/django-recaptcha

View file

@ -101,7 +101,7 @@ TEMPLATES = [
DATABASES = {
"default": {
"ENGINE": "django.db.backends.postgresql_psycopg2",
"ENGINE": "django.db.backends.postgresql",
"NAME": DBNAME,
"USER": DBUSER,
"PASSWORD": DBPASSWD,
@ -111,6 +111,7 @@ DATABASES = {
SITE_ID = 1
DEFAULT_AUTO_FIELD = "django.db.models.AutoField"
# ---
# Internationalization

View file

@ -47,8 +47,7 @@ CACHES = {"default": {"BACKEND": "django.core.cache.backends.locmem.LocMemCache"
# Use the default in memory asgi backend for local development
CHANNEL_LAYERS = {
"default": {
"BACKEND": "asgiref.inmemory.ChannelLayer",
"ROUTING": "gestioasso.routing.routing",
"BACKEND": "channels.layers.InMemoryChannelLayer",
}
}

View file

@ -1 +0,0 @@
default_app_config = "gestioncof.apps.GestioncofConfig"

View file

@ -6,7 +6,7 @@ from django.contrib.auth.models import Group, Permission, User
from django.db.models import Q
from django.urls import reverse
from django.utils.safestring import mark_safe
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import gettext_lazy as _
from gestioncof.models import (
Club,

View file

@ -1 +0,0 @@
default_app_config = "gestioncof.cms.apps.COFCMSAppConfig"

View file

@ -2,7 +2,7 @@ from datetime import date, timedelta
from django import template
from django.utils import formats, timezone
from django.utils.translation import ugettext as _
from django.utils.translation import gettext as _
from ..models import COFActuPage, COFRootPage

View file

@ -3,7 +3,7 @@ from django.contrib.auth import get_user_model
from django.contrib.auth.forms import AuthenticationForm
from django.forms.formsets import BaseFormSet, formset_factory
from django.forms.widgets import CheckboxSelectMultiple, RadioSelect
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import gettext_lazy as _
from djconfig.forms import ConfigForm
from bda.models import Spectacle
@ -276,7 +276,9 @@ class RegistrationProfileForm(forms.ModelForm):
self.fields["mailing_bda_revente"].initial = True
self.fields["mailing_unernestaparis"].initial = True
self.fields.keyOrder = [
class Meta:
model = CofProfile
fields = [
"login_clipper",
"phone",
"occupation",
@ -290,22 +292,6 @@ class RegistrationProfileForm(forms.ModelForm):
"comments",
]
class Meta:
model = CofProfile
fields = (
"login_clipper",
"phone",
"occupation",
"departement",
"is_cof",
"type_cotiz",
"mailing_cof",
"mailing_bda",
"mailing_bda_revente",
"mailing_unernestaparis",
"comments",
)
STATUS_CHOICES = (
("no", "Non"),

View file

@ -0,0 +1,43 @@
# Generated by Django 3.2.13 on 2022-06-30 10:41
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("gestioncof", "0018_petitscours_email"),
]
operations = [
migrations.AlterUniqueTogether(
name="eventregistration",
unique_together=set(),
),
migrations.AlterUniqueTogether(
name="petitcoursability",
unique_together=set(),
),
migrations.AlterUniqueTogether(
name="surveyanswer",
unique_together=set(),
),
migrations.AddConstraint(
model_name="eventregistration",
constraint=models.UniqueConstraint(
fields=("user", "event"), name="unique_event_registration"
),
),
migrations.AddConstraint(
model_name="petitcoursability",
constraint=models.UniqueConstraint(
fields=("user", "niveau", "matiere"), name="unique_competence_level"
),
),
migrations.AddConstraint(
model_name="surveyanswer",
constraint=models.UniqueConstraint(
fields=("user", "survey"), name="unique_survey_answer"
),
),
]

View file

@ -2,7 +2,7 @@ from django.contrib.auth.models import User
from django.db import models
from django.db.models.signals import post_delete, post_save
from django.dispatch import receiver
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import gettext_lazy as _
from bda.models import Spectacle
from shared.utils import choices_length
@ -194,8 +194,12 @@ class EventRegistration(models.Model):
paid = models.BooleanField("A payé", default=False)
class Meta:
constraints = [
models.UniqueConstraint(
fields=["user", "event"], name="unique_event_registration"
)
]
verbose_name = "Inscription"
unique_together = ("user", "event")
def __str__(self):
return "Inscription de {} à {}".format(self.user, self.event.title)
@ -247,8 +251,12 @@ class SurveyAnswer(models.Model):
answers = models.ManyToManyField(SurveyQuestionAnswer, related_name="selected_by")
class Meta:
constraints = [
models.UniqueConstraint(
fields=["user", "survey"], name="unique_survey_answer"
)
]
verbose_name = "Réponses"
unique_together = ("user", "survey")
def __str__(self):
return "Réponse de %s sondage %s" % (

View file

@ -1,7 +1,7 @@
from django.contrib import messages
from django.contrib.auth.signals import user_logged_in
from django.dispatch import receiver
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import gettext_lazy as _
from django_cas_ng.signals import cas_user_authenticated

View file

@ -1,4 +1,4 @@
{% load staticfiles %}
{% load static %}
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
<html xmlns="http://www.w3.org/1999/xhtml" lang="fr">

View file

@ -1,5 +1,5 @@
{% extends "base_title.html" %}
{% load staticfiles %}
{% load static %}
{% block page_size %}col-sm-8{% endblock %}

View file

@ -1,4 +1,4 @@
{% load staticfiles %}
{% load static %}
<script type="text/javascript">
var supernifty_tristate = function() {
var

View file

@ -194,7 +194,9 @@ class RegistrationViewTests(ViewTestCaseMixin, TestCase):
)
er = e.eventregistration_set.get(user=self.users["user"])
self.assertQuerysetEqual(er.options.all(), map(repr, [oc1, oc3]), ordered=False)
self.assertQuerysetEqual(
er.options.all(), map(repr, [oc1, oc3]), transform=repr, ordered=False
)
self.assertCountEqual(
er.comments.values_list("content", flat=True), ["comment 1"]
)
@ -299,10 +301,10 @@ class RegistrationAutocompleteViewTests(MockLDAPMixin, ViewTestCaseMixin, TestCa
raise ValueError("Unexpected section name: {}".format(section.name))
self.assertQuerysetEqual(
others, map(str, expected_others), ordered=False, transform=str
others, map(str, expected_others), transform=str, ordered=False
)
self.assertQuerysetEqual(
members, map(str, expected_members), ordered=False, transform=str
members, map(str, expected_members), transform=str, ordered=False
)
self.assertSetEqual(
set(clippers), set(map(LDAPSearch().result_verbose_name, expected_clippers))
@ -648,7 +650,10 @@ class ClubListViewTests(ViewTestCaseMixin, TestCase):
self.assertEqual(r.status_code, 200)
self.assertQuerysetEqual(
r.context["owned_clubs"], map(repr, [self.c1, self.c2]), ordered=False
r.context["owned_clubs"],
map(repr, [self.c1, self.c2]),
transform=repr,
ordered=False,
)
@ -950,7 +955,10 @@ class EventViewTests(ViewTestCaseMixin, TestCase):
er = self.e.eventregistration_set.get(user=self.users["user"])
self.assertQuerysetEqual(
er.options.all(), map(repr, [self.oc1, self.oc3, self.oc4]), ordered=False
er.options.all(),
map(repr, [self.oc1, self.oc3, self.oc4]),
transform=repr,
ordered=False,
)
# TODO: Make the view care about comments.
# self.assertQuerysetEqual(
@ -975,7 +983,9 @@ class EventViewTests(ViewTestCaseMixin, TestCase):
self.assertIn(self.post_expected_message, get_messages(r.wsgi_request))
er.refresh_from_db()
self.assertQuerysetEqual(er.options.all(), map(repr, [self.oc3]), ordered=False)
self.assertQuerysetEqual(
er.options.all(), map(repr, [self.oc3]), transform=repr, ordered=False
)
# TODO: Make the view care about comments.
# self.assertQuerysetEqual(
# er.comments.all(), map(repr, []),
@ -1029,7 +1039,10 @@ class EventStatusViewTests(ViewTestCaseMixin, TestCase):
self.assertEqual(r.status_code, 200)
self.assertQuerysetEqual(
r.context["user_choices"], map(repr, expected), ordered=False
r.context["user_choices"],
map(repr, expected),
transform=repr,
ordered=False,
)
def test_filter_none(self):
@ -1096,7 +1109,10 @@ class SurveyViewTests(ViewTestCaseMixin, TestCase):
a = self.s.surveyanswer_set.get(user=self.users["user"])
self.assertQuerysetEqual(
a.answers.all(), map(repr, [self.qa1, self.qa3, self.qa4]), ordered=False
a.answers.all(),
map(repr, [self.qa1, self.qa3, self.qa4]),
transform=repr,
ordered=False,
)
def test_post_edit(self):
@ -1115,7 +1131,9 @@ class SurveyViewTests(ViewTestCaseMixin, TestCase):
self.assertIn(self.post_expected_message, get_messages(r.wsgi_request))
a.refresh_from_db()
self.assertQuerysetEqual(a.answers.all(), map(repr, [self.qa3]), ordered=False)
self.assertQuerysetEqual(
a.answers.all(), map(repr, [self.qa3]), transform=repr, ordered=False
)
def test_post_delete(self):
a = self.s.surveyanswer_set.create(user=self.users["user"])
@ -1196,7 +1214,10 @@ class SurveyStatusViewTests(ViewTestCaseMixin, TestCase):
self.assertEqual(r.status_code, 200)
self.assertQuerysetEqual(
r.context["user_answers"], map(repr, expected), ordered=False
r.context["user_answers"],
map(repr, expected),
transform=repr,
ordered=False,
)
def test_filter_none(self):

View file

@ -20,7 +20,7 @@ from django.shortcuts import get_object_or_404, redirect, render
from django.template import loader
from django.urls import reverse_lazy
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import gettext_lazy as _
from django.views.generic import FormView, TemplateView
from django_cas_ng.views import LogoutView as CasLogoutView
from icalendar import Calendar, Event as Vevent

View file

@ -1,3 +1,2 @@
default_app_config = "kfet.apps.KFetConfig"
KFET_DELETED_TRIGRAMME = "☠☠☠"
KFET_DELETED_USERNAME = "kfet_deleted_user"

View file

@ -1,4 +1,2 @@
default_app_config = "kfet.auth.apps.KFetAuthConfig"
KFET_GENERIC_USERNAME = "kfet_genericteam"
KFET_GENERIC_TRIGRAMME = "GNR"

View file

@ -1,6 +1,6 @@
from django.apps import AppConfig
from django.db.models.signals import post_migrate
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import gettext_lazy as _
class KFetAuthConfig(AppConfig):

View file

@ -1,5 +1,5 @@
from django.contrib.auth.models import User
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import gettext_lazy as _
from shared.forms import ProtectedModelForm

View file

@ -1,7 +1,7 @@
from django.contrib.auth.models import Group, Permission
from django.db import models
from django.utils.crypto import get_random_string
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import gettext_lazy as _
KFET_APP_LABELS = ["kfet", "kfetauth"]

View file

@ -3,7 +3,7 @@ from django.contrib.auth.signals import user_logged_in
from django.dispatch import receiver
from django.urls import reverse
from django.utils.safestring import mark_safe
from django.utils.translation import ugettext as _
from django.utils.translation import gettext_lazy as _
from .utils import get_kfet_generic_user

View file

@ -40,6 +40,7 @@ class UserGroupFormTests(TestCase):
self.assertQuerysetEqual(
groups_field.queryset,
[repr(g.group_ptr) for g in self.kfet_groups],
transform=repr,
ordered=False,
)

View file

@ -9,7 +9,7 @@ from django.http import QueryDict
from django.shortcuts import redirect, render
from django.urls import reverse, reverse_lazy
from django.utils.decorators import method_decorator
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import gettext_lazy as _
from django.views.decorators.http import require_http_methods
from django.views.generic import View
from django.views.generic.edit import CreateView, UpdateView

View file

@ -1 +0,0 @@
default_app_config = "kfet.cms.apps.KFetCMSAppConfig"

View file

@ -1,4 +1,4 @@
from django.contrib.staticfiles.templatetags.staticfiles import static
from django.templatetags.static import static
from django.utils.html import format_html
from wagtail.core import hooks

View file

@ -1,5 +1,5 @@
from django.db import models
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import gettext_lazy as _
from wagtail.admin.edit_handlers import (
FieldPanel,
FieldRowPanel,

View file

@ -1,6 +1,18 @@
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from .utils import DjangoJsonWebsocketConsumer, PermConsumerMixin
class KPsul(PermConsumerMixin, DjangoJsonWebsocketConsumer):
groups = ["kfet.kpsul"]
perms_connect = ["kfet.is_team"]
async def kpsul(self, event):
await self.send_json(event)
@classmethod
@async_to_sync
async def group_send(cls, group, data):
channel_layer = get_channel_layer()
await channel_layer.group_send(group, data)

View file

@ -8,7 +8,7 @@ from django.core import validators
from django.core.exceptions import ValidationError
from django.forms import modelformset_factory
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import gettext_lazy as _
from djconfig.forms import ConfigForm
from gestioncof.models import CofProfile

View file

@ -10,7 +10,7 @@ from django.db.models import F
from django.template import loader
from django.urls import reverse
from django.utils import timezone
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import gettext_lazy as _
from gestioncof.models import CofProfile
from shared.utils import choices_length

View file

@ -12,13 +12,15 @@ class OpenKfetConsumer(PermConsumerMixin, DjangoJsonWebsocketConsumer):
"""
def connection_groups(self, user, **kwargs):
"""Select which group the user should be connected."""
if kfet_is_team(user):
return ["kfet.open.team"]
return ["kfet.open.base"]
async def open_status(self, event):
await self.send_json(event)
def connect(self, message, *args, **kwargs):
async def connect(self):
"""Send current status on connect."""
super().connect(message, *args, **kwargs)
self.send(kfet_open.export(message.user))
await super().connect()
group = "team" if kfet_is_team(self.user) else "base"
await self.channel_layer.group_add(f"kfet.open.{group}", self.channel_name)
await self.send_json(kfet_open.export(self.user))

View file

@ -1,5 +1,6 @@
from datetime import timedelta
from channels.layers import get_channel_layer
from django.utils import timezone
from ..decorators import kfet_is_team
@ -77,7 +78,7 @@ class OpenKfet(CachedMixin, object):
"""
status = self.status()
base = {"status": status}
base = {"status": status, "type": "open.status"}
restrict = {
"admin_status": self.admin_status(status),
"force_close": self.force_close,
@ -95,13 +96,14 @@ class OpenKfet(CachedMixin, object):
base, team = self._export()
return team if kfet_is_team(user) else base
def send_ws(self):
async def send_ws(self):
"""Send internal state to websocket channels."""
from .consumers import OpenKfetConsumer
base, team = self._export()
OpenKfetConsumer.group_send("kfet.open.base", base)
OpenKfetConsumer.group_send("kfet.open.team", team)
channel_layer = get_channel_layer()
await channel_layer.group_send("kfet.open.base", base)
await channel_layer.group_send("kfet.open.team", team)
kfet_open = OpenKfet()

View file

@ -1,5 +1,10 @@
from channels.routing import route_class
from channels.routing import URLRouter
from django.urls import path
from . import consumers
from .consumers import OpenKfetConsumer
routing = [route_class(consumers.OpenKfetConsumer)]
OpenRouter = URLRouter(
[
path("", OpenKfetConsumer.as_asgi()),
]
)

View file

@ -1,19 +1,24 @@
import json
import random
from datetime import timedelta
from unittest import mock
from channels.channel import Group
from channels.test import ChannelTestCase, WSClient
from asgiref.sync import async_to_sync
from channels.auth import AuthMiddlewareStack
from channels.consumer import get_channel_layer
from channels.testing import WebsocketCommunicator
from django.contrib.auth.models import AnonymousUser, Permission, User
from django.test import Client
from django.test import Client, TestCase
from django.utils import timezone
from . import OpenKfet
from .consumers import OpenKfetConsumer
class OpenKfetTest(ChannelTestCase):
def ws_communicator(cls, path: str, headers=[]):
return WebsocketCommunicator(AuthMiddlewareStack(cls.as_asgi()), path, headers)
class OpenKfetTest(TestCase):
"""OpenKfet object unit-tests suite."""
def setUp(self):
@ -79,7 +84,7 @@ class OpenKfetTest(ChannelTestCase):
def test_export_user(self):
"""Export is limited for an anonymous user."""
export = self.kfet_open.export(AnonymousUser())
self.assertSetEqual(set(["status"]), set(export))
self.assertSetEqual(set(["status", "type"]), set(export))
def test_export_team(self):
"""Export all values for a team member."""
@ -89,24 +94,32 @@ class OpenKfetTest(ChannelTestCase):
)
user.user_permissions.add(is_team)
export = self.kfet_open.export(user)
self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(export))
self.assertSetEqual(
set(["status", "admin_status", "force_close", "type"]), set(export)
)
def test_send_ws(self):
Group("kfet.open.base").add("test.open.base")
Group("kfet.open.team").add("test.open.team")
async def test_send_ws(self):
channel_layer = get_channel_layer()
base_channel = await channel_layer.new_channel()
team_channel = await channel_layer.new_channel()
self.kfet_open.send_ws()
await channel_layer.group_add("kfet.open.base", base_channel)
await channel_layer.group_add("kfet.open.team", team_channel)
recv_base = self.get_next_message("test.open.base", require=True)
base = json.loads(recv_base["text"])
self.assertSetEqual(set(["status"]), set(base))
await self.kfet_open.send_ws()
recv_admin = self.get_next_message("test.open.team", require=True)
admin = json.loads(recv_admin["text"])
self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(admin))
base = await channel_layer.receive(base_channel)
self.assertSetEqual(set(["status", "type"]), set(base))
team = await channel_layer.receive(team_channel)
self.assertSetEqual(
set(["status", "admin_status", "force_close", "type"]), set(team)
)
class OpenKfetViewsTest(ChannelTestCase):
class OpenKfetViewsTest(TestCase):
"""OpenKfet views unit-tests suite."""
def setUp(self):
@ -177,119 +190,136 @@ class OpenKfetViewsTest(ChannelTestCase):
self.assertEqual(403, resp.status_code)
class OpenKfetConsumerTest(ChannelTestCase):
class OpenKfetConsumerTest(TestCase):
"""OpenKfet consumer unit-tests suite."""
def test_standard_user(self):
"""Lambda user is added to kfet.open.base group."""
# setup anonymous client
c = WSClient()
# connect
c.send_and_consume(
"websocket.connect", path="/ws/k-fet/open", fail_on_none=True
)
# initialization data is replied on connection
self.assertIsNotNone(c.receive())
# client belongs to the 'kfet.open' group...
OpenKfetConsumer.group_send("kfet.open.base", {"test": "plop"})
self.assertEqual(c.receive(), {"test": "plop"})
# ...but not to the 'kfet.open.admin' one
OpenKfetConsumer.group_send("kfet.open.team", {"test": "plop"})
self.assertIsNone(c.receive())
@mock.patch("gestioncof.signals.messages")
def test_team_user(self, mock_messages):
"""Team user is added to kfet.open.team group."""
# setup team user and its client
@classmethod
def setUpTestData(cls):
t = User.objects.create_user("team", "", "team")
is_team = Permission.objects.get(
codename="is_team", content_type__app_label="kfet"
)
t.user_permissions.add(is_team)
c = WSClient()
c.force_login(t, backend="django.contrib.auth.backends.ModelBackend")
# connect
c.send_and_consume(
"websocket.connect", path="/ws/k-fet/open", fail_on_none=True
)
cls.team_user = t
async def test_standard_user(self):
"""Lambda user is added to kfet.open.base group."""
# setup anonymous client
c = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
connected, _ = await c.connect()
self.assertTrue(connected)
# initialization data is replied on connection
self.assertIsNotNone(c.receive())
message = await c.receive_json_from()
self.assertIsNotNone(message)
# client belongs to the 'kfet.open.admin' group...
OpenKfetConsumer.group_send("kfet.open.team", {"test": "plop"})
self.assertEqual(c.receive(), {"test": "plop"})
# client belongs to the 'kfet.open' group...
channel_layer = get_channel_layer()
await channel_layer.group_send(
"kfet.open.base", {"test": "plop", "type": "open.status"}
)
message = await c.receive_json_from()
self.assertEqual(message, {"test": "plop"})
# ...but not to the 'kfet.open.admin' one
await channel_layer.group_send(
"kfet.open.team", {"test": "plop", "type": "open.status"}
)
self.assertTrue(await c.receive_nothing())
async def test_team_user(self):
"""Team user is added to kfet.open.team group."""
# On simule l'appartenance de l'user à la team kfet car l'utilisation de
# tests async avec postgres fait tout planter si on modifie la db dans un
# des sous tests.
with mock.patch("gestioncof.signals.messages"), mock.patch(
"kfet.open.consumers.kfet_is_team", return_value=True
):
c = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
connected, _ = await c.connect()
channel_layer = get_channel_layer()
self.assertTrue(connected)
# initialization data is replied on connection
message = await c.receive_json_from()
self.assertIsNotNone(message)
# client belongs to the 'kfet.open.team' group...
await channel_layer.group_send(
"kfet.open.team", {"test": "plop", "type": "open.status"}
)
message = await c.receive_json_from()
self.assertEqual(message, {"test": "plop"})
# ...but not to the 'kfet.open' one
OpenKfetConsumer.group_send("kfet.open.base", {"test": "plop"})
self.assertIsNone(c.receive())
await channel_layer.group_send(
"kfet.open.base", {"test": "plop", "type": "open.status"}
)
self.assertTrue(await c.receive_nothing())
class OpenKfetScenarioTest(ChannelTestCase):
class OpenKfetScenarioTest(TestCase):
"""OpenKfet functionnal tests suite."""
def setUp(self):
# Need this (and here) because of '<client>.login' in setUp
patcher_messages = mock.patch("gestioncof.signals.messages")
patcher_messages.start()
self.addCleanup(patcher_messages.stop)
@classmethod
def setUpTestData(cls):
# root user
cls.r = User.objects.create_superuser("team", "", "team")
# anonymous client (for views)
self.c = Client()
# anonymous client (for websockets)
self.c_ws = WSClient()
cls.c = Client()
# root user
self.r = User.objects.create_superuser("root", "", "root")
# its client (for views)
self.r_c = Client()
self.r_c.login(username="root", password="root")
# its client (for websockets)
self.r_c_ws = WSClient()
self.r_c_ws.force_login(
self.r, backend="django.contrib.auth.backends.ModelBackend"
)
# root client
cls.r_c = Client()
with mock.patch("gestioncof.signals.messages"):
cls.r_c.login(username="team", password="team")
def setUp(self):
# Create channels to listen to messages
channel_layer = get_channel_layer()
self.channel = async_to_sync(channel_layer.new_channel)()
self.team_channel = async_to_sync(channel_layer.new_channel)()
async_to_sync(channel_layer.group_add)("kfet.open.base", self.channel)
async_to_sync(channel_layer.group_add)("kfet.open.team", self.team_channel)
self.receive_msg = lambda c: async_to_sync(channel_layer.receive)(c)
self.kfet_open = OpenKfet(
cache_prefix="test_kfetopen_%s" % random.randrange(2**20)
)
self.addCleanup(self.kfet_open.clear_cache)
def ws_connect(self, ws_client):
ws_client.send_and_consume(
"websocket.connect", path="/ws/k-fet/open", fail_on_none=True
)
return ws_client.receive(json=True)
async def ws_connect(self, ws_communicator):
c, _ = await ws_communicator.connect()
def test_scenario_0(self):
"""Clients connect."""
# test for anonymous user
msg = self.ws_connect(self.c_ws)
self.assertSetEqual(set(["status"]), set(msg))
# test for root user
msg = self.ws_connect(self.r_c_ws)
self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(msg))
self.assertTrue(c)
return await ws_communicator.receive_json_from()
def test_scenario_1(self):
"""Clients connect, door opens, enable force close."""
self.ws_connect(self.c_ws)
self.ws_connect(self.r_c_ws)
# door sent "I'm open!"
self.c.post("/k-fet/open/raw_open", {"raw_open": True, "token": "plop"})
# anonymous user agree
msg = self.c_ws.receive(json=True)
msg = self.receive_msg(self.channel)
self.assertEqual(OpenKfet.OPENED, msg["status"])
# root user too
msg = self.r_c_ws.receive(json=True)
msg = self.receive_msg(self.team_channel)
self.assertEqual(OpenKfet.OPENED, msg["status"])
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
@ -297,11 +327,11 @@ class OpenKfetScenarioTest(ChannelTestCase):
self.r_c.post("/k-fet/open/force_close", {"force_close": True})
# so anonymous user see it's closed
msg = self.c_ws.receive(json=True)
msg = self.receive_msg(self.channel)
self.assertEqual(OpenKfet.CLOSED, msg["status"])
# root user too
msg = self.r_c_ws.receive(json=True)
msg = self.receive_msg(self.team_channel)
self.assertEqual(OpenKfet.CLOSED, msg["status"])
# but root knows things
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
@ -312,20 +342,42 @@ class OpenKfetScenarioTest(ChannelTestCase):
self.kfet_open.raw_open = True
self.kfet_open.force_close = True
msg = self.ws_connect(self.c_ws)
async_to_sync(OpenKfet().send_ws)()
msg = self.receive_msg(self.channel)
self.assertEqual(OpenKfet.CLOSED, msg["status"])
msg = self.ws_connect(self.r_c_ws)
msg = self.receive_msg(self.team_channel)
self.assertEqual(OpenKfet.CLOSED, msg["status"])
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
self.assertTrue(msg["force_close"])
self.r_c.post("/k-fet/open/force_close", {"force_close": False})
msg = self.c_ws.receive(json=True)
msg = self.receive_msg(self.channel)
self.assertEqual(OpenKfet.OPENED, msg["status"])
msg = self.r_c_ws.receive(json=True)
msg = self.receive_msg(self.team_channel)
self.assertEqual(OpenKfet.OPENED, msg["status"])
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
self.assertFalse(msg["force_close"])
async def test_scenario_3(self):
"""Clients connect."""
# anonymous client (for websockets)
self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
# test for anonymous user
msg = await self.ws_connect(self.c_ws)
self.assertSetEqual(set(["status"]), set(msg))
# test for root user
with mock.patch(
"kfet.open.consumers.kfet_is_team", return_value=True
), mock.patch("kfet.open.open.kfet_is_team", return_value=True):
self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
msg = await self.ws_connect(self.r_c_ws)
self.assertSetEqual(
set(["status", "admin_status", "force_close"]), set(msg)
)

View file

@ -1,3 +1,4 @@
from asgiref.sync import async_to_sync
from django.conf import settings
from django.contrib.auth.decorators import permission_required
from django.core.exceptions import PermissionDenied
@ -18,7 +19,7 @@ def raw_open(request):
raise PermissionDenied
raw_open = request.POST.get("raw_open") in TRUE_STR
kfet_open.raw_open = raw_open
kfet_open.send_ws()
async_to_sync(kfet_open.send_ws)()
return HttpResponse()
@ -27,5 +28,5 @@ def raw_open(request):
def force_close(request):
force_close = request.POST.get("force_close") in TRUE_STR
kfet_open.force_close = force_close
kfet_open.send_ws()
async_to_sync(kfet_open.send_ws)()
return HttpResponse()

View file

@ -1,8 +1,13 @@
from channels.routing import include, route_class
from channels.routing import URLRouter
from django.urls import path
from . import consumers
from kfet.open.routing import OpenRouter
routing = [
route_class(consumers.KPsul, path=r"^/k-psul/$"),
include("kfet.open.routing.routing", path=r"^/open"),
from .consumers import KPsul
KFRouter = URLRouter(
[
path("k-psul/", KPsul.as_asgi()),
path("open", OpenRouter),
]
)

View file

@ -1,5 +1,5 @@
{% extends "kfet/base_form.html" %}
{% load staticfiles %}
{% load static %}
{% block title %}Nouveau compte{% endblock %}
{% block header-title %}Création d'un compte{% endblock %}

View file

@ -1,5 +1,5 @@
{% extends "kfet/base.html" %}
{% load staticfiles %}
{% load static %}
{% block title %}Nouveau compte{% endblock %}
{% block header-title %}Création d'un compte{% endblock %}

View file

@ -1,5 +1,5 @@
{% extends 'kfet/base_form.html' %}
{% load staticfiles %}
{% load static %}
{% load widget_tweaks %}
{% block title %}Permissions - Édition{% endblock %}

View file

@ -1,5 +1,5 @@
{% extends "kfet/base_col_2.html" %}
{% load staticfiles %}
{% load static %}
{% load kfet_tags %}
{% load l10n %}

View file

@ -1,5 +1,5 @@
{% extends 'kfet/base_col_2.html' %}
{% load staticfiles kfet_tags %}
{% load static kfet_tags %}
{% block extra_head %}
<script type="text/javascript" src="{% static 'kfet/vendor/Chart.min.js' %}"></script>

View file

@ -1,5 +1,5 @@
{% extends 'kfet/base_col_2.html' %}
{% load l10n staticfiles widget_tweaks bootstrap %}
{% load l10n static widget_tweaks bootstrap %}
{% block extra_head %}
<link rel="stylesheet" type="text/css" href="{% static 'kfet/vendor/multiple-select/multiple-select.css' %}">

View file

@ -1,5 +1,5 @@
{% extends "kfet/base_col_1.html" %}
{% load staticfiles %}
{% load static %}
{% load kfet_tags %}
{% block title %}Accueil{% endblock %}

View file

@ -1,5 +1,5 @@
{% extends 'kfet/base.html' %}
{% load staticfiles %}
{% load static %}
{% block extra_head %}
<link rel="stylesheet" style="text/css" href="{% static 'kfet/css/kpsul_grid.css' %}">

View file

@ -1,6 +1,5 @@
{% extends 'kfet/base_col_2.html' %}
{% load staticfiles %}
{% load l10n staticfiles widget_tweaks %}
{% load l10n static widget_tweaks %}
{% block title %}Transferts{% endblock %}
{% block header-title %}Transferts{% endblock %}

View file

@ -1,5 +1,5 @@
{% extends "kfet/base_col_1.html" %}
{% load staticfiles %}
{% load static %}
{% block extra_head %}
<link rel="stylesheet" type="text/css" href="{% static 'kfet/css/transfers_form.css' %}">

View file

@ -94,6 +94,7 @@ class PermHelpersTest(TestCaseMixin, TestCase):
self.assertQuerysetEqual(
user.user_permissions.all(),
map(repr, [self.perm1, self.perm2, self.perm_team]),
transform=repr,
ordered=False,
)

View file

@ -3,6 +3,8 @@ from datetime import datetime, timedelta
from decimal import Decimal
from unittest import mock
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from django.contrib.auth.models import User
from django.test import Client, TestCase
from django.urls import reverse
@ -518,6 +520,7 @@ class AccountGroupCreateViewTests(ViewTestCaseMixin, TestCase):
self.assertQuerysetEqual(
group.permissions.all(),
map(repr, [self.perms["kfet.is_team"], self.perms["kfet.manage_perms"]]),
transform=repr,
ordered=False,
)
@ -571,6 +574,7 @@ class AccountGroupUpdateViewTests(ViewTestCaseMixin, TestCase):
self.assertQuerysetEqual(
self.group.permissions.all(),
map(repr, [self.perms["kfet.is_team"], self.perms["kfet.manage_perms"]]),
transform=repr,
ordered=False,
)
@ -598,6 +602,7 @@ class AccountNegativeListViewTests(ViewTestCaseMixin, TestCase):
self.assertQuerysetEqual(
r.context["negatives"],
map(repr, [self.accounts["user"].negative]),
transform=repr,
ordered=False,
)
@ -698,7 +703,7 @@ class AccountStatOperationListViewTests(ViewTestCaseMixin, TestCase):
for stat, expected in zip(content["stats"], expected_stats):
expected_url = expected.pop("url")
self.assertUrlsEqual(stat["url"], expected_url)
self.assertDictContainsSubset(expected, stat)
self.assertEqual(stat, {**stat, **expected})
class AccountStatOperationViewTests(ViewTestCaseMixin, TestCase):
@ -807,7 +812,7 @@ class AccountStatBalanceListViewTests(ViewTestCaseMixin, TestCase):
for stat, expected in zip(content["stats"], expected_stats):
expected_url = expected.pop("url")
self.assertUrlsEqual(stat["url"], expected_url)
self.assertDictContainsSubset(expected, stat)
self.assertEqual(stat, {**stat, **expected})
class AccountStatBalanceViewTests(ViewTestCaseMixin, TestCase):
@ -875,6 +880,7 @@ class CheckoutListViewTests(ViewTestCaseMixin, TestCase):
self.assertQuerysetEqual(
r.context["checkouts"],
map(repr, [self.checkout1, self.checkout2]),
transform=repr,
ordered=False,
)
@ -1065,6 +1071,7 @@ class CheckoutStatementListViewTests(ViewTestCaseMixin, TestCase):
self.assertQuerysetEqual(
r.context["checkoutstatements"],
map(repr, expected_statements),
transform=repr,
ordered=False,
)
@ -1291,7 +1298,9 @@ class ArticleCategoryListViewTests(ViewTestCaseMixin, TestCase):
self.assertEqual(r.status_code, 200)
self.assertQuerysetEqual(
r.context["categories"], map(repr, [self.category1, self.category2])
r.context["categories"],
map(repr, [self.category1, self.category2]),
transform=repr,
)
@ -1366,7 +1375,9 @@ class ArticleListViewTests(ViewTestCaseMixin, TestCase):
r = self.client.get(self.url)
self.assertEqual(r.status_code, 200)
self.assertQuerysetEqual(
r.context["articles"], map(repr, [self.article1, self.article2])
r.context["articles"],
map(repr, [self.article1, self.article2]),
transform=repr,
)
@ -1636,7 +1647,7 @@ class ArticleStatSalesListViewTests(ViewTestCaseMixin, TestCase):
for stat, expected in zip(content["stats"], expected_stats):
expected_url = expected.pop("url")
self.assertUrlsEqual(stat["url"], expected_url)
self.assertDictContainsSubset(expected, stat)
self.assertEqual(stat, {**stat, **expected})
class ArticleStatSalesViewTests(ViewTestCaseMixin, TestCase):
@ -1705,7 +1716,7 @@ class KPsulCheckoutDataViewTests(ViewTestCaseMixin, TestCase):
expected = {"name": "Checkout", "balance": "10.00"}
self.assertDictContainsSubset(expected, content)
self.assertEqual(content, {**content, **expected})
self.assertSetEqual(
set(content.keys()),
@ -1808,10 +1819,13 @@ class KPsulPerformOperationsViewTests(ViewTestCaseMixin, TestCase):
self.account.balance = Decimal("50.00")
self.account.save()
# Mock consumer of K-Psul websocket to catch what we're sending
kpsul_consumer_patcher = mock.patch("kfet.consumers.KPsul")
self.kpsul_consumer_mock = kpsul_consumer_patcher.start()
self.addCleanup(kpsul_consumer_patcher.stop)
# Create a channel to listen to KPsul's messages
channel_layer = get_channel_layer()
self.channel = async_to_sync(channel_layer.new_channel)()
async_to_sync(channel_layer.group_add)("kfet.kpsul", self.channel)
self.receive_msg = lambda: async_to_sync(channel_layer.receive)(self.channel)
# Reset cache of kfet config
kfet_config._conf_init = False
@ -2043,9 +2057,12 @@ class KPsulPerformOperationsViewTests(ViewTestCaseMixin, TestCase):
self.assertEqual(self.article.stock, 18)
# Check websocket data
self.kpsul_consumer_mock.group_send.assert_called_once_with(
"kfet.kpsul",
ws_data = self.receive_msg()
self.assertDictEqual(
ws_data,
{
"type": "kpsul",
"groups": [
{
"add": True,
@ -2307,9 +2324,12 @@ class KPsulPerformOperationsViewTests(ViewTestCaseMixin, TestCase):
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("110.75"))
self.kpsul_consumer_mock.group_send.assert_called_once_with(
"kfet.kpsul",
ws_data = self.receive_msg()
self.assertDictEqual(
ws_data,
{
"type": "kpsul",
"groups": [
{
"add": True,
@ -2478,9 +2498,12 @@ class KPsulPerformOperationsViewTests(ViewTestCaseMixin, TestCase):
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("89.25"))
self.kpsul_consumer_mock.group_send.assert_called_once_with(
"kfet.kpsul",
ws_data = self.receive_msg()
self.assertDictEqual(
ws_data,
{
"type": "kpsul",
"groups": [
{
"add": True,
@ -2635,9 +2658,12 @@ class KPsulPerformOperationsViewTests(ViewTestCaseMixin, TestCase):
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("100.00"))
self.kpsul_consumer_mock.group_send.assert_called_once_with(
"kfet.kpsul",
ws_data = self.receive_msg()
self.assertDictEqual(
ws_data,
{
"type": "kpsul",
"groups": [
{
"add": True,
@ -2750,9 +2776,9 @@ class KPsulPerformOperationsViewTests(ViewTestCaseMixin, TestCase):
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("100.00"))
ws_data_ope = self.kpsul_consumer_mock.group_send.call_args[0][1]["groups"][0][
"entries"
][0]
ws_data = self.receive_msg()
ws_data_ope = ws_data["groups"][0]["entries"][0]
self.assertEqual(ws_data_ope["addcost_amount"], Decimal("1.00"))
self.assertEqual(ws_data_ope["addcost_for__trigramme"], "ADD")
@ -2790,9 +2816,9 @@ class KPsulPerformOperationsViewTests(ViewTestCaseMixin, TestCase):
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("100.00"))
ws_data_ope = self.kpsul_consumer_mock.group_send.call_args[0][1]["groups"][0][
"entries"
][0]
ws_data = self.receive_msg()
ws_data_ope = ws_data["groups"][0]["entries"][0]
self.assertEqual(ws_data_ope["addcost_amount"], Decimal("0.80"))
self.assertEqual(ws_data_ope["addcost_for__trigramme"], "ADD")
@ -2828,9 +2854,9 @@ class KPsulPerformOperationsViewTests(ViewTestCaseMixin, TestCase):
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("106.00"))
ws_data_ope = self.kpsul_consumer_mock.group_send.call_args[0][1]["groups"][0][
"entries"
][0]
ws_data = self.receive_msg()
ws_data_ope = ws_data["groups"][0]["entries"][0]
self.assertEqual(ws_data_ope["addcost_amount"], Decimal("1.00"))
self.assertEqual(ws_data_ope["addcost_for__trigramme"], "ADD")
@ -2864,9 +2890,9 @@ class KPsulPerformOperationsViewTests(ViewTestCaseMixin, TestCase):
self.accounts["addcost"].refresh_from_db()
self.assertEqual(self.accounts["addcost"].balance, Decimal("15.00"))
ws_data_ope = self.kpsul_consumer_mock.group_send.call_args[0][1]["groups"][0][
"entries"
][0]
ws_data = self.receive_msg()
ws_data_ope = ws_data["groups"][0]["entries"][0]
self.assertEqual(ws_data_ope["addcost_amount"], None)
self.assertEqual(ws_data_ope["addcost_for__trigramme"], None)
@ -2899,9 +2925,9 @@ class KPsulPerformOperationsViewTests(ViewTestCaseMixin, TestCase):
self.accounts["addcost"].refresh_from_db()
self.assertEqual(self.accounts["addcost"].balance, Decimal("0.00"))
ws_data_ope = self.kpsul_consumer_mock.group_send.call_args[0][1]["groups"][0][
"entries"
][0]
ws_data = self.receive_msg()
ws_data_ope = ws_data["groups"][0]["entries"][0]
self.assertEqual(ws_data_ope["addcost_amount"], None)
self.assertEqual(ws_data_ope["addcost_for__trigramme"], None)
@ -3123,9 +3149,12 @@ class KPsulPerformOperationsViewTests(ViewTestCaseMixin, TestCase):
self.assertEqual(article2.stock, -6)
# Check websocket data
self.kpsul_consumer_mock.group_send.assert_called_once_with(
"kfet.kpsul",
ws_data = self.receive_msg()
self.assertDictEqual(
ws_data,
{
"type": "kpsul",
"groups": [
{
"add": True,
@ -3218,10 +3247,14 @@ class KPsulCancelOperationsViewTests(ViewTestCaseMixin, TestCase):
self.account.balance = Decimal("50.00")
self.account.save()
# Mock consumer of K-Psul websocket to catch what we're sending
kpsul_consumer_patcher = mock.patch("kfet.consumers.KPsul")
self.kpsul_consumer_mock = kpsul_consumer_patcher.start()
self.addCleanup(kpsul_consumer_patcher.stop)
# Create a channel to listen to KPsul's messages
channel_layer = get_channel_layer()
self.channel = async_to_sync(channel_layer.new_channel)()
async_to_sync(channel_layer.group_add)("kfet.kpsul", self.channel)
self.receive_msg = lambda: async_to_sync(channel_layer.receive)(self.channel)
def _assertResponseOk(self, response):
"""
@ -3271,7 +3304,11 @@ class KPsulCancelOperationsViewTests(ViewTestCaseMixin, TestCase):
on_acc=self.account,
checkout=self.checkout,
content=[
{"type": Operation.PURCHASE, "article": self.article, "article_nb": 2}
{
"type": Operation.PURCHASE,
"article": self.article,
"article_nb": 2,
}
],
)
operation = group.opes.get()
@ -3345,9 +3382,15 @@ class KPsulCancelOperationsViewTests(ViewTestCaseMixin, TestCase):
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("100.00"))
self.kpsul_consumer_mock.group_send.assert_called_with(
"kfet.kpsul",
{"checkouts": [], "articles": [{"id": self.article.pk, "stock": 22}]},
ws_data = self.receive_msg()
self.assertDictEqual(
ws_data,
{
"type": "kpsul",
"checkouts": [],
"articles": [{"id": self.article.pk, "stock": 22}],
},
)
def test_purchase_with_addcost(self):
@ -3407,11 +3450,11 @@ class KPsulCancelOperationsViewTests(ViewTestCaseMixin, TestCase):
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("95.00"))
ws_data_checkouts = self.kpsul_consumer_mock.group_send.call_args[0][1][
"checkouts"
]
ws_data = self.receive_msg()
self.assertListEqual(
ws_data_checkouts, [{"id": self.checkout.pk, "balance": Decimal("95.00")}]
ws_data["checkouts"],
[{"id": self.checkout.pk, "balance": Decimal("95.00")}],
)
def test_purchase_cash_with_addcost(self):
@ -3447,11 +3490,11 @@ class KPsulCancelOperationsViewTests(ViewTestCaseMixin, TestCase):
addcost_account.refresh_from_db()
self.assertEqual(addcost_account.balance, Decimal("9.00"))
ws_data_checkouts = self.kpsul_consumer_mock.group_send.call_args[0][1][
"checkouts"
]
ws_data = self.receive_msg()
self.assertListEqual(
ws_data_checkouts, [{"id": self.checkout.pk, "balance": Decimal("94.00")}]
ws_data["checkouts"],
[{"id": self.checkout.pk, "balance": Decimal("94.00")}],
)
@mock.patch("django.utils.timezone.now")
@ -3533,9 +3576,12 @@ class KPsulCancelOperationsViewTests(ViewTestCaseMixin, TestCase):
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("89.25"))
self.kpsul_consumer_mock.group_send.assert_called_with(
"kfet.kpsul",
ws_data = self.receive_msg()
self.assertDictEqual(
ws_data,
{
"type": "kpsul",
"checkouts": [{"id": self.checkout.pk, "balance": Decimal("89.25")}],
"articles": [],
},
@ -3620,9 +3666,12 @@ class KPsulCancelOperationsViewTests(ViewTestCaseMixin, TestCase):
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("110.75"))
self.kpsul_consumer_mock.group_send.assert_called_with(
"kfet.kpsul",
ws_data = self.receive_msg()
self.assertDictEqual(
ws_data,
{
"type": "kpsul",
"checkouts": [{"id": self.checkout.pk, "balance": Decimal("110.75")}],
"articles": [],
},
@ -3707,9 +3756,11 @@ class KPsulCancelOperationsViewTests(ViewTestCaseMixin, TestCase):
self.checkout.refresh_from_db()
self.assertEqual(self.checkout.balance, Decimal("100.00"))
self.kpsul_consumer_mock.group_send.assert_called_with(
"kfet.kpsul",
{"checkouts": [], "articles": []},
ws_data = self.receive_msg()
self.assertDictEqual(
ws_data,
{"type": "kpsul", "checkouts": [], "articles": []},
)
@mock.patch("django.utils.timezone.now")
@ -4049,7 +4100,7 @@ class KPsulArticlesData(ViewTestCaseMixin, TestCase):
]
for expected, article in zip(expected_list, articles):
self.assertDictContainsSubset(expected, article)
self.assertEqual(article, {**article, **expected})
self.assertSetEqual(
set(article.keys()),
set(
@ -4149,7 +4200,7 @@ class AccountReadJSONViewTests(ViewTestCaseMixin, TestCase):
content = json.loads(r.content.decode("utf-8"))
expected = {"name": "first last", "trigramme": "000", "balance": "0.00"}
self.assertDictContainsSubset(expected, content)
self.assertEqual(content, {**content, **expected})
self.assertSetEqual(
set(content.keys()),
@ -4393,7 +4444,9 @@ class InventoryListViewTests(ViewTestCaseMixin, TestCase):
self.assertEqual(r.status_code, 200)
inventories = r.context["inventories"]
self.assertQuerysetEqual(inventories, map(repr, [self.inventory]))
self.assertQuerysetEqual(
inventories, map(repr, [self.inventory]), transform=repr
)
class InventoryCreateViewTests(ViewTestCaseMixin, TestCase):
@ -4580,7 +4633,7 @@ class OrderListViewTests(ViewTestCaseMixin, TestCase):
self.assertEqual(r.status_code, 200)
orders = r.context["orders"]
self.assertQuerysetEqual(orders, map(repr, [self.order]))
self.assertQuerysetEqual(orders, map(repr, [self.order]), transform=repr)
class OrderReadViewTests(ViewTestCaseMixin, TestCase):
@ -4795,7 +4848,9 @@ class OrderToInventoryViewTests(ViewTestCaseMixin, TestCase):
inventory,
{"by": self.accounts["team1"], "at": self.now, "order": self.order},
)
self.assertQuerysetEqual(inventory.articles.all(), map(repr, [self.article]))
self.assertQuerysetEqual(
inventory.articles.all(), map(repr, [self.article]), transform=repr
)
compte = InventoryArticle.objects.get(article=self.article)

View file

@ -1,8 +1,7 @@
import json
import math
from channels.channel import Group
from channels.generic.websockets import JsonWebsocketConsumer
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from django.core.cache import cache
from django.core.serializers.json import DjangoJSONEncoder
@ -63,7 +62,7 @@ class CachedMixin:
# Consumers
class DjangoJsonWebsocketConsumer(JsonWebsocketConsumer):
class DjangoJsonWebsocketConsumer(AsyncJsonWebsocketConsumer):
"""Custom Json Websocket Consumer.
Encode to JSON with DjangoJSONEncoder.
@ -71,7 +70,10 @@ class DjangoJsonWebsocketConsumer(JsonWebsocketConsumer):
"""
@classmethod
def encode_json(cls, content):
async def encode_json(cls, content):
# Remove the type value, only used by Channels to choose the group to send to
content.pop("type")
return json.dumps(content, cls=DjangoJSONEncoder)
@ -89,31 +91,11 @@ class PermConsumerMixin:
http_user = True # Enable message.user
perms_connect = []
def connect(self, message, **kwargs):
async def connect(self):
"""Check permissions on connection."""
if message.user.has_perms(self.perms_connect):
super().connect(message, **kwargs)
self.user = self.scope["user"]
if self.user.has_perms(self.perms_connect):
await super().connect()
else:
self.close()
def raw_connect(self, message, **kwargs):
# Same as original raw_connect method of JsonWebsocketConsumer
# We add user to connection_groups call.
groups = self.connection_groups(user=message.user, **kwargs)
for group in groups:
Group(group, channel_layer=message.channel_layer).add(message.reply_channel)
self.connect(message, **kwargs)
def raw_disconnect(self, message, **kwargs):
# Same as original raw_connect method of JsonWebsocketConsumer
# We add user to connection_groups call.
groups = self.connection_groups(user=message.user, **kwargs)
for group in groups:
Group(group, channel_layer=message.channel_layer).discard(
message.reply_channel
)
self.disconnect(message, **kwargs)
def connection_groups(self, user, **kwargs):
"""`message.user` is available as `user` arg. Original behavior."""
return super().connection_groups(user=user, **kwargs)
await self.close()

View file

@ -44,10 +44,11 @@ from django.views.generic.detail import BaseDetailView
from django.views.generic.edit import CreateView, DeleteView, UpdateView
from gestioncof.models import CofProfile
from kfet import KFET_DELETED_TRIGRAMME, consumers
from kfet import KFET_DELETED_TRIGRAMME
from kfet.auth.decorators import kfet_password_auth
from kfet.autocomplete import kfet_account_only_autocomplete, kfet_autocomplete
from kfet.config import kfet_config
from kfet.consumers import KPsul
from kfet.decorators import teamkfet_required
from kfet.forms import (
AccountForm,
@ -1054,8 +1055,13 @@ def kpsul_update_addcost(request):
kfet_config.set(addcost_for=account, addcost_amount=amount)
data = {"addcost": {"for": account and account.trigramme or None, "amount": amount}}
consumers.KPsul.group_send("kfet.kpsul", data)
data = {
"addcost": {"for": account and account.trigramme or None, "amount": amount},
"type": "kpsul",
}
KPsul.group_send("kfet.kpsul", data)
return JsonResponse(data)
@ -1239,7 +1245,7 @@ def kpsul_perform_operations(request):
)
# Websocket data
websocket_data = {}
websocket_data = {"type": "kpsul"}
websocket_data["groups"] = [
{
"add": True,
@ -1286,7 +1292,9 @@ def kpsul_perform_operations(request):
websocket_data["articles"].append(
{"id": article["id"], "stock": article["stock"]}
)
consumers.KPsul.group_send("kfet.kpsul", websocket_data)
KPsul.group_send("kfet.kpsul", websocket_data)
return JsonResponse(data)
@ -1481,7 +1489,7 @@ def cancel_operations(request):
articles = Article.objects.values("id", "stock").filter(pk__in=articles_pk)
# Websocket data
websocket_data = {"checkouts": [], "articles": []}
websocket_data = {"checkouts": [], "articles": [], "type": "kpsul"}
for checkout in checkouts:
websocket_data["checkouts"].append(
{"id": checkout["id"], "balance": checkout["balance"]}
@ -1490,7 +1498,8 @@ def cancel_operations(request):
websocket_data["articles"].append(
{"id": article["id"], "stock": article["stock"]}
)
consumers.KPsul.group_send("kfet.kpsul", websocket_data)
KPsul.group_send("kfet.kpsul", websocket_data)
data["canceled"] = list(opes)
data["opegroups_to_update"] = list(opegroups)

View file

@ -3,7 +3,7 @@ import os
import sys
if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "gestioasso.settings.local")
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "gestioasso.settings")
from django.core.management import execute_from_command_line

View file

@ -2,7 +2,7 @@ from django.contrib.auth.models import User
from django.db import models
from django.db.models import Min
from django.utils.functional import cached_property
from django.utils.translation import ugettext_lazy as _
from django.utils.translation import gettext_lazy as _
from shared.utils import choices_length
@ -44,9 +44,13 @@ class PetitCoursAbility(models.Model):
class Meta:
app_label = "gestioncof"
constraints = [
models.UniqueConstraint(
fields=["user", "niveau", "matiere"], name="unique_competence_level"
)
]
verbose_name = "Compétence petits cours"
verbose_name_plural = "Compétences des petits cours"
unique_together = ("user", "niveau", "matiere")
def __str__(self):
return "{:s} - {!s} - {:s}".format(

View file

@ -1,5 +1,5 @@
{% extends "petitscours/base_title.html" %}
{% load staticfiles %}
{% load static %}
{% block page_size %}col-sm-8{% endblock %}

View file

@ -1,5 +1,5 @@
{% extends "petitscours/base_title.html" %}
{% load staticfiles %}
{% load static %}
{% block realcontent %}
<h2>Demandes de petits cours</h2>

View file

@ -1,4 +1,4 @@
{% load staticfiles %}
{% load static %}
<table class="table table-striped">
<tr class="danger"><td><strong>Date</strong></td><td> {{ demande.created }}</td></tr>
<tr class="warning"><td><strong>Nom/prénom</strong></td><td> {{ demande.name }}</td></tr>

View file

@ -1,5 +1,5 @@
{% extends "base_title.html" %}
{% load staticfiles %}
{% load static %}
{% block extra_head %}
<script src="{% static 'vendor/jquery/jquery-ui.min.js' %}" type="text/javascript"></script>

View file

@ -1,5 +1,5 @@
{% extends "petitscours/base_title.html" %}
{% load staticfiles %}
{% load static %}
{% block realcontent %}
<h2>

View file

@ -15,8 +15,8 @@ server {
rewrite ^/gestion$ http://localhost:8080/gestion/ redirect;
# Les pages statiques sont servies à part.
location /gestion/static { try_files $uri $uri/ =404; }
location /gestion/media { try_files $uri $uri/ =404; }
location /static { try_files $uri $uri/ =404; }
location /media { try_files $uri $uri/ =404; }
# On proxy-pass les requêtes vers les pages dynamiques à daphne
location / {

View file

@ -11,7 +11,7 @@ WorkingDirectory=/vagrant
Environment="DJANGO_SETTINGS_MODULE=gestioasso.settings.dev"
ExecStart=/home/vagrant/venv/bin/daphne \
-u /srv/gestiocof/gestiocof.sock \
cof.asgi:channel_layer
gestioasso.asgi:application
[Install]
WantedBy=multi-user.target

View file

@ -10,7 +10,10 @@ Group=vagrant
TimeoutSec=300
WorkingDirectory=/vagrant
Environment="DJANGO_SETTINGS_MODULE=gestioasso.settings.dev"
ExecStart=/home/vagrant/venv/bin/python manage.py runworker
ExecStart=/home/vagrant/venv/bin/python manage.py runworker \
'kfet.open.team' \
'kfet.open.base' \
'kpsul'
[Install]
WantedBy=multi-user.target

View file

@ -3,6 +3,6 @@ django-debug-toolbar==3.2.*
ipython
# Tools
black
black==22.3.0
flake8
isort

View file

@ -1,16 +1,15 @@
-r requirements.txt
# Postgresql bindings
psycopg2<2.8
psycopg2==2.9.*
# Redis
django-redis-cache==2.1.*
redis~=2.10.6
asgi-redis==1.4.*
django-redis-cache==3.0.*
redis==3.5.*
channels-redis==3.4.*
# ASGI protocol and HTTP server
asgiref~=1.1.2
daphne==1.3.0
daphne==3.0.*
# ldap bindings
python-ldap

View file

@ -1,19 +1,19 @@
Django==2.2.*
Django==3.2.*
Pillow==7.2.0
authens==0.1b0
channels==1.1.*
authens==0.1b4
channels==3.0.*
configparser==3.5.0
django-autocomplete-light==3.3.*
django-autocomplete-light==3.9.4
django-bootstrap-form==3.3
django-cas-ng==3.6.*
django-cors-headers==2.2.0
django-djconfig==0.8.0
django-hCaptcha==0.1.0
django-cas-ng==4.3.*
django-cors-headers==3.13.0
django-djconfig==0.10.0
django-hCaptcha==0.2.0
django-js-reverse==0.9.1
django-widget-tweaks==1.4.1
icalendar==4.0.7
python-dateutil==2.8.1
statistics==1.0.3.5
wagtail-modeltranslation==0.10.*
wagtail==2.7.*
wagtail-modeltranslation==0.11.*
wagtail==2.13.*
wagtailmenus==3.0.*

View file

@ -3,8 +3,8 @@ source =
bda
bds
clubs
cof
events
gestioasso
gestioncof
kfet
petitscours

45
shared/channels.py Normal file
View file

@ -0,0 +1,45 @@
import datetime
import random
from decimal import Decimal
import msgpack
from channels_redis.core import RedisChannelLayer
def encode_kf(obj):
if isinstance(obj, Decimal):
return {"__decimal__": True, "as_str": str(obj)}
elif isinstance(obj, datetime.datetime):
return {"__datetime__": True, "as_str": obj.strftime("%Y%m%dT%H:%M:%S.%f")}
return obj
def decode_kf(obj):
if "__decimal__" in obj:
obj = Decimal(obj["as_str"])
elif "__datetime__" in obj:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
return obj
class ChannelLayer(RedisChannelLayer):
def serialize(self, message):
"""Serializes to a byte string."""
value = msgpack.packb(message, default=encode_kf, use_bin_type=True)
if self.crypter:
value = self.crypter.encrypt(value)
# As we use an sorted set to expire messages
# we need to guarantee uniqueness, with 12 bytes.
random_prefix = random.getrandbits(8 * 12).to_bytes(12, "big")
return random_prefix + value
def deserialize(self, message):
"""Deserializes from a byte string."""
# Removes the random prefix
message = message[12:]
if self.crypter:
message = self.crypter.decrypt(message, self.expiry + 10)
return msgpack.unpackb(message, object_hook=decode_kf, raw=False)