Merge branch 'aureplop/kfet-auth' into aureplop/kfet-auth_perms

This commit is contained in:
Aurélien Delobelle 2017-10-12 11:07:16 +02:00
commit 085a068020
28 changed files with 818 additions and 163 deletions

View file

@ -7,6 +7,7 @@ variables:
DJANGO_SETTINGS_MODULE: "cof.settings.prod"
DBHOST: "postgres"
REDIS_HOST: "redis"
REDIS_PASSWD: "dummy"
# Cached packages
PYTHONPATH: "$CI_PROJECT_DIR/vendor/python"
@ -16,6 +17,9 @@ variables:
POSTGRES_USER: "cof_gestion"
POSTGRES_DB: "cof_gestion"
# psql password authentication
PGPASSWORD: $POSTGRES_PASSWORD
cache:
paths:
@ -28,11 +32,11 @@ before_script:
- apt-get update -q && apt-get -o dir::cache::archives="vendor/apt" install -yqq postgresql-client
- sed -E 's/^REDIS_HOST.*/REDIS_HOST = "redis"/' cof/settings/secret_example.py > cof/settings/secret.py
# Remove the old test database if it has not been done yet
- psql --username=cof_gestion --password="4KZt3nGPLVeWSvtBZPSM3fSzXpzEU4" --host="$DBHOST"
-e "DROP DATABASE test_$DBNAME" || true
- psql --username=$POSTGRES_USER --host=$DBHOST -c "DROP DATABASE IF EXISTS test_$POSTGRES_DB"
- pip install --cache-dir vendor/pip -t vendor/python -r requirements.txt
- redis-cli config set requirepass $REDIS_PASSWD || true
test:
stage: test
script:
- python manage.py test
- python manage.py test -v3

View file

@ -56,17 +56,17 @@ class AttributionInline(admin.TabularInline):
def get_queryset(self, request):
qs = super().get_queryset(request)
if self.listing is not None:
qs.filter(spectacle__listing=self.listing)
qs = qs.filter(spectacle__listing=self.listing)
return qs
class WithListingAttributionInline(AttributionInline):
exclude = ('given', )
form = WithListingAttributionTabularAdminForm
listing = True
class WithoutListingAttributionInline(AttributionInline):
exclude = ('given', )
form = WithoutListingAttributionTabularAdminForm
listing = False

View file

@ -100,7 +100,7 @@ MIDDLEWARE_CLASSES = [
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.auth.middleware.SessionAuthenticationMiddleware',
'kfet.auth.middleware.KFetAuthenticationMiddleware',
'kfet.auth.middleware.TemporaryAuthMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'django.middleware.security.SecurityMiddleware',
@ -128,7 +128,7 @@ TEMPLATES = [
'wagtailmenus.context_processors.wagtailmenus',
'djconfig.context_processors.config',
'gestioncof.shared.context_processor',
'kfet.auth.context_processors.auth',
'kfet.auth.context_processors.temporary_auth',
'kfet.context_processors.config',
],
},
@ -191,7 +191,7 @@ CAS_EMAIL_FORMAT = "%s@clipper.ens.fr"
AUTHENTICATION_BACKENDS = (
'django.contrib.auth.backends.ModelBackend',
'gestioncof.shared.COFCASBackend',
'kfet.auth.backends.GenericTeamBackend',
'kfet.auth.backends.GenericBackend',
)
RECAPTCHA_USE_SSL = True

View file

@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('gestioncof', '0012_merge'),
]
operations = [
migrations.AlterField(
model_name='cofprofile',
name='occupation',
field=models.CharField(
verbose_name='Occupation',
max_length=9,
default='1A',
choices=[
('exterieur', 'Extérieur'),
('1A', '1A'),
('2A', '2A'),
('3A', '3A'),
('4A', '4A'),
('archicube', 'Archicube'),
('doctorant', 'Doctorant'),
('CST', 'CST'),
('PEI', 'PEI')
]),
),
migrations.AlterField(
model_name='cofprofile',
name='type_cotiz',
field=models.CharField(
verbose_name='Type de cotisation',
max_length=9,
default='normalien',
choices=[
('etudiant', 'Normalien étudiant'),
('normalien', 'Normalien élève'),
('exterieur', 'Extérieur'),
('gratis', 'Gratuit')
]),
),
]

View file

@ -8,23 +8,6 @@ from gestioncof.petits_cours_models import choices_length
from bda.models import Spectacle
OCCUPATION_CHOICES = (
('exterieur', _("Extérieur")),
('1A', _("1A")),
('2A', _("2A")),
('3A', _("3A")),
('4A', _("4A")),
('archicube', _("Archicube")),
('doctorant', _("Doctorant")),
('CST', _("CST")),
)
TYPE_COTIZ_CHOICES = (
('etudiant', _("Normalien étudiant")),
('normalien', _("Normalien élève")),
('exterieur', _("Extérieur")),
)
TYPE_COMMENT_FIELD = (
('text', _("Texte long")),
('char', _("Texte court")),
@ -32,6 +15,40 @@ TYPE_COMMENT_FIELD = (
class CofProfile(models.Model):
STATUS_EXTE = "exterieur"
STATUS_1A = "1A"
STATUS_2A = "2A"
STATUS_3A = "3A"
STATUS_4A = "4A"
STATUS_ARCHI = "archicube"
STATUS_DOCTORANT = "doctorant"
STATUS_CST = "CST"
STATUS_PEI = "PEI"
OCCUPATION_CHOICES = (
(STATUS_EXTE, _("Extérieur")),
(STATUS_1A, _("1A")),
(STATUS_2A, _("2A")),
(STATUS_3A, _("3A")),
(STATUS_4A, _("4A")),
(STATUS_ARCHI, _("Archicube")),
(STATUS_DOCTORANT, _("Doctorant")),
(STATUS_CST, _("CST")),
(STATUS_PEI, _("PEI")),
)
COTIZ_ETUDIANT = "etudiant"
COTIZ_NORMALIEN = "normalien"
COTIZ_EXTE = "exterieur"
COTIZ_GRATIS = "gratis"
TYPE_COTIZ_CHOICES = (
(COTIZ_ETUDIANT, _("Normalien étudiant")),
(COTIZ_NORMALIEN, _("Normalien élève")),
(COTIZ_EXTE, _("Extérieur")),
(COTIZ_GRATIS, _("Gratuit")),
)
user = models.OneToOneField(User, related_name="profile")
login_clipper = models.CharField(
"Login clipper", max_length=32, blank=True

View file

@ -11,7 +11,6 @@ class KFetConfig(AppConfig):
verbose_name = "Application K-Fêt"
def ready(self):
import kfet.signals
self.register_config()
def register_config(self):

View file

@ -1 +1,4 @@
default_app_config = 'kfet.auth.apps.KFetAuthConfig'
KFET_GENERIC_USERNAME = 'kfet_genericteam'
KFET_GENERIC_TRIGRAMME = 'GNR'

View file

@ -1,4 +1,5 @@
from django.apps import AppConfig
from django.db.models.signals import post_migrate
from django.utils.translation import ugettext_lazy as _
@ -6,3 +7,8 @@ class KFetAuthConfig(AppConfig):
name = 'kfet.auth'
label = 'kfetauth'
verbose_name = _("K-Fêt - Authentification et Autorisation")
def ready(self):
from . import signals # noqa
from .utils import setup_kfet_generic_user
post_migrate.connect(setup_kfet_generic_user, sender=self)

View file

@ -1,49 +1,17 @@
# -*- coding: utf-8 -*-
import hashlib
from django.contrib.auth.models import User, Permission
from gestioncof.models import CofProfile
from django.contrib.auth import get_user_model
from kfet.models import Account, GenericTeamToken
from .utils import get_kfet_generic_user
class KFetBackend(object):
def authenticate(self, request):
password = request.POST.get('KFETPASSWORD', '')
password = request.META.get('HTTP_KFETPASSWORD', password)
if not password:
return None
try:
password_sha256 = (
hashlib.sha256(password.encode('utf-8'))
.hexdigest()
)
account = Account.objects.get(password=password_sha256)
return account.cofprofile.user
except Account.DoesNotExist:
return None
User = get_user_model()
class GenericTeamBackend(object):
def authenticate(self, username=None, token=None):
valid_token = GenericTeamToken.objects.get(token=token)
if username == 'kfet_genericteam' and valid_token:
# Création du user s'il n'existe pas déjà
user, _ = User.objects.get_or_create(username='kfet_genericteam')
profile, _ = CofProfile.objects.get_or_create(user=user)
account, _ = Account.objects.get_or_create(
cofprofile=profile,
trigramme='GNR')
# Ajoute la permission kfet.is_team à ce user
perm_is_team = Permission.objects.get(codename='is_team')
user.user_permissions.add(perm_is_team)
return user
return None
class BaseKFetBackend:
def get_user(self, user_id):
"""
Add extra select related up to Account.
"""
try:
return (
User.objects
@ -52,3 +20,24 @@ class GenericTeamBackend(object):
)
except User.DoesNotExist:
return None
class AccountBackend(BaseKFetBackend):
def authenticate(self, request, kfet_password=None):
try:
return Account.objects.get_by_password(kfet_password).user
except Account.DoesNotExist:
return None
class GenericBackend(BaseKFetBackend):
def authenticate(self, request, kfet_token=None):
try:
team_token = GenericTeamToken.objects.get(token=kfet_token)
except GenericTeamToken.DoesNotExist:
return
# No need to keep the token.
team_token.delete()
return get_kfet_generic_user()

View file

@ -1,7 +1,7 @@
from django.contrib.auth.context_processors import PermWrapper
def auth(request):
def temporary_auth(request):
if hasattr(request, 'real_user'):
return {
'user': request.real_user,

View file

@ -1,12 +1,13 @@
# -*- coding: utf-8 -*-
from django.contrib.auth import get_user_model
from django.contrib.auth.models import User
from .backends import AccountBackend
from .backends import KFetBackend
User = get_user_model()
class KFetAuthenticationMiddleware(object):
"""Authenticate another user for this request if KFetBackend succeeds.
class TemporaryAuthMiddleware:
"""Authenticate another user for this request if AccountBackend succeeds.
By the way, if a user is authenticated, we refresh its from db to add
values from CofProfile and Account of this user.
@ -15,15 +16,23 @@ class KFetAuthenticationMiddleware(object):
def process_request(self, request):
if request.user.is_authenticated():
# avoid multiple db accesses in views and templates
user_pk = request.user.pk
request.user = (
User.objects
.select_related('profile__account_kfet')
.get(pk=user_pk)
.get(pk=request.user.pk)
)
kfet_backend = KFetBackend()
temp_request_user = kfet_backend.authenticate(request)
temp_request_user = AccountBackend().authenticate(
request,
kfet_password=self.get_kfet_password(request),
)
if temp_request_user:
request.real_user = request.user
request.user = temp_request_user
def get_kfet_password(self, request):
return (
request.META.get('HTTP_KFETPASSWORD') or
request.POST.get('KFETPASSWORD')
)

View file

@ -8,6 +8,9 @@ class Migration(migrations.Migration):
dependencies = [
('auth', '0006_require_contenttypes_0002'),
# Following dependency allows using Account model to set up the kfet
# generic user in post_migrate receiver.
('kfet', '0058_delete_genericteamtoken'),
]
operations = [

View file

@ -2,12 +2,24 @@ from django.contrib.auth.models import (
Group as DjangoGroup, Permission as DjangoPermission,
)
from django.db import models
from django.utils.crypto import get_random_string
from django.utils.translation import ugettext_lazy as _
class GenericTeamTokenManager(models.Manager):
def create_token(self):
token = get_random_string(50)
while self.filter(token=token).exists():
token = get_random_string(50)
return self.create(token=token)
class GenericTeamToken(models.Model):
token = models.CharField(max_length=50, unique=True)
objects = GenericTeamTokenManager()
class Group(DjangoGroup):

40
kfet/auth/signals.py Normal file
View file

@ -0,0 +1,40 @@
from django.contrib import messages
from django.contrib.auth.signals import user_logged_in
from django.core.urlresolvers import reverse
from django.dispatch import receiver
from django.utils.safestring import mark_safe
from django.utils.translation import ugettext as _
from .utils import get_kfet_generic_user
@receiver(user_logged_in)
def suggest_auth_generic(sender, request, user, **kwargs):
"""
Suggest logged in user to continue as the kfet generic user.
Message is only added if the following conditions are met:
- the next page (where user is going to be redirected due to successful
authentication) is related to kfet, i.e. 'k-fet' is in its url.
- logged in user is a kfet staff member (except the generic user).
"""
# Filter against the next page.
if not(hasattr(request, 'GET') and 'next' in request.GET):
return
next_page = request.GET['next']
generic_url = reverse('kfet.login.generic')
if not('k-fet' in next_page and not next_page.startswith(generic_url)):
return
# Filter against the logged in user.
if not(user.has_perm('kfet.is_team') and user != get_kfet_generic_user()):
return
# Seems legit to add message.
text = _("K-Fêt — Ouvrir une session partagée ?")
messages.info(request, mark_safe(
'<a href="#" data-url="{}" onclick="submit_url(this)">{}</a>'
.format(generic_url, text)
))

View file

@ -1,7 +0,0 @@
{% extends 'kfet/base.html' %}
{% block extra_head %}
<script type="text/javascript">
close();
</script>
{% endblock %}

View file

@ -1,16 +1,28 @@
# -*- coding: utf-8 -*-
from unittest import mock
from django.contrib.auth.models import Group as DjangoGroup, User
from django.contrib.auth.models import (
AnonymousUser, Group as DjangoGroup, User,
)
from django.contrib.contenttypes.models import ContentType
from django.test import TestCase
from django.core import signing
from django.core.urlresolvers import reverse
from django.test import RequestFactory, TestCase
from kfet.models import Account
from . import KFET_GENERIC_TRIGRAMME, KFET_GENERIC_USERNAME
from .backends import AccountBackend, GenericBackend
from .fields import GroupsField, CorePermissionsField
from .forms import GroupForm, UserGroupForm
from .middleware import TemporaryAuthMiddleware
from .models import GenericTeamToken, Group, Permission
from .utils import get_kfet_generic_user
from .views import GenericLoginView
##
# Forms, and theirs fields.
# Forms and form fields
##
@ -171,3 +183,303 @@ class PermissionTests(TestCase):
with self.assertRaises(Permission.DoesNotExist):
Permission.kfetcore.get(codename='code')
##
# K-Fêt generic user object
##
class KFetGenericUserTests(TestCase):
def test_exists(self):
"""
The account is set up when app is ready, so it should exist.
"""
generic = Account.objects.get_generic()
self.assertEqual(generic.trigramme, KFET_GENERIC_TRIGRAMME)
self.assertEqual(generic.user.username, KFET_GENERIC_USERNAME)
self.assertEqual(get_kfet_generic_user(), generic.user)
##
# Backends
##
class AccountBackendTests(TestCase):
def setUp(self):
self.request = RequestFactory().get('/')
def test_valid(self):
acc = Account(trigramme='000')
acc.change_pwd('valid')
acc.save({'username': 'user'})
auth = AccountBackend().authenticate(
self.request, kfet_password='valid')
self.assertEqual(auth, acc.user)
def test_invalid(self):
auth = AccountBackend().authenticate(
self.request, kfet_password='invalid')
self.assertIsNone(auth)
class GenericBackendTests(TestCase):
def setUp(self):
self.request = RequestFactory().get('/')
def test_valid(self):
token = GenericTeamToken.objects.create_token()
auth = GenericBackend().authenticate(
self.request, kfet_token=token.token)
self.assertEqual(auth, get_kfet_generic_user())
self.assertEqual(GenericTeamToken.objects.all().count(), 0)
def test_invalid(self):
auth = GenericBackend().authenticate(
self.request, kfet_token='invalid')
self.assertIsNone(auth)
##
# Views
##
class GenericLoginViewTests(TestCase):
def setUp(self):
patcher_messages = mock.patch('gestioncof.signals.messages')
patcher_messages.start()
self.addCleanup(patcher_messages.stop)
user_acc = Account(trigramme='000')
user_acc.save({'username': 'user'})
self.user = user_acc.user
self.user.set_password('user')
self.user.save()
team_acc = Account(trigramme='100')
team_acc.save({'username': 'team'})
self.team = team_acc.user
self.team.set_password('team')
self.team.save()
self.team.user_permissions.add(
Permission.objects.get(
content_type__app_label='kfet', codename='is_team'),
)
self.url = reverse('kfet.login.generic')
self.generic_user = get_kfet_generic_user()
def test_url(self):
self.assertEqual(self.url, '/k-fet/login/generic')
def test_notoken_get(self):
"""
Send confirmation for user to emit POST request, instead of GET.
"""
self.client.login(username='team', password='team')
r = self.client.get(self.url)
self.assertEqual(r.status_code, 200)
self.assertTemplateUsed(r, 'kfet/confirm_form.html')
def test_notoken_post(self):
"""
POST request without token in COOKIES sets a token and redirects to
logout url.
"""
self.client.login(username='team', password='team')
r = self.client.post(self.url)
self.assertRedirects(
r, '/logout?next={}'.format(self.url),
fetch_redirect_response=False,
)
def test_notoken_not_team(self):
"""
Logged in user must be a team user to initiate login as generic user.
"""
self.client.login(username='user', password='user')
# With GET.
r = self.client.get(self.url)
self.assertRedirects(
r, '/login?next={}'.format(self.url),
fetch_redirect_response=False,
)
# Also with POST.
r = self.client.post(self.url)
self.assertRedirects(
r, '/login?next={}'.format(self.url),
fetch_redirect_response=False,
)
def _set_signed_cookie(self, client, key, value):
signed_value = signing.get_cookie_signer(salt=key).sign(value)
client.cookies.load({key: signed_value})
def _is_cookie_deleted(self, client, key):
try:
self.assertNotIn(key, client.cookies)
except AssertionError:
try:
cookie = client.cookies[key]
# It also can be emptied.
self.assertEqual(cookie.value, '')
self.assertEqual(
cookie['expires'], 'Thu, 01-Jan-1970 00:00:00 GMT')
self.assertEqual(cookie['max-age'], 0)
except AssertionError:
raise AssertionError("The cookie '%s' still exists." % key)
def test_withtoken_valid(self):
"""
The kfet generic user is logged in.
"""
token = GenericTeamToken.objects.create(token='valid')
self._set_signed_cookie(
self.client, GenericLoginView.TOKEN_COOKIE_NAME, 'valid')
r = self.client.get(self.url)
self.assertRedirects(r, reverse('kfet.kpsul'))
self.assertEqual(r.wsgi_request.user, self.generic_user)
self._is_cookie_deleted(
self.client, GenericLoginView.TOKEN_COOKIE_NAME)
with self.assertRaises(GenericTeamToken.DoesNotExist):
token.refresh_from_db()
def test_withtoken_invalid(self):
"""
If token is invalid, delete it and try again.
"""
self._set_signed_cookie(
self.client, GenericLoginView.TOKEN_COOKIE_NAME, 'invalid')
r = self.client.get(self.url)
self.assertRedirects(r, self.url, fetch_redirect_response=False)
self.assertEqual(r.wsgi_request.user, AnonymousUser())
self._is_cookie_deleted(
self.client, GenericLoginView.TOKEN_COOKIE_NAME)
def test_flow_ok(self):
"""
A team user is logged in as the kfet generic user.
"""
self.client.login(username='team', password='team')
next_url = '/k-fet/'
r = self.client.post(
'{}?next={}'.format(self.url, next_url), follow=True)
self.assertEqual(r.wsgi_request.user, self.generic_user)
self.assertEqual(r.wsgi_request.path, '/k-fet/')
##
# Temporary authentication
#
# Includes:
# - TemporaryAuthMiddleware
# - temporary_auth context processor
##
class TemporaryAuthTests(TestCase):
def setUp(self):
patcher_messages = mock.patch('gestioncof.signals.messages')
patcher_messages.start()
self.addCleanup(patcher_messages.stop)
self.factory = RequestFactory()
user1_acc = Account(trigramme='000')
user1_acc.change_pwd('kfet_user1')
user1_acc.save({'username': 'user1'})
self.user1 = user1_acc.user
self.user1.set_password('user1')
self.user1.save()
user2_acc = Account(trigramme='100')
user2_acc.change_pwd('kfet_user2')
user2_acc.save({'username': 'user2'})
self.user2 = user2_acc.user
self.user2.set_password('user2')
self.user2.save()
self.perm = Permission.objects.get(
content_type__app_label='kfet', codename='is_team')
self.user2.user_permissions.add(self.perm)
def test_middleware_header(self):
"""
A user can be authenticated if ``HTTP_KFETPASSWORD`` header of a
request contains a valid kfet password.
"""
request = self.factory.get('/', HTTP_KFETPASSWORD='kfet_user2')
request.user = self.user1
TemporaryAuthMiddleware().process_request(request)
self.assertEqual(request.user, self.user2)
self.assertEqual(request.real_user, self.user1)
def test_middleware_post(self):
"""
A user can be authenticated if ``KFETPASSWORD`` of POST data contains
a valid kfet password.
"""
request = self.factory.post('/', {'KFETPASSWORD': 'kfet_user2'})
request.user = self.user1
TemporaryAuthMiddleware().process_request(request)
self.assertEqual(request.user, self.user2)
self.assertEqual(request.real_user, self.user1)
def test_middleware_invalid(self):
"""
The given password must be a password of an Account.
"""
request = self.factory.post('/', {'KFETPASSWORD': 'invalid'})
request.user = self.user1
TemporaryAuthMiddleware().process_request(request)
self.assertEqual(request.user, self.user1)
self.assertFalse(hasattr(request, 'real_user'))
def test_context_processor(self):
"""
Context variables give the real authenticated user and his permissions.
"""
self.client.login(username='user1', password='user1')
r = self.client.get('/k-fet/accounts/', HTTP_KFETPASSWORD='kfet_user2')
self.assertEqual(r.context['user'], self.user1)
self.assertNotIn('kfet.is_team', r.context['perms'])
def test_auth_not_persistent(self):
"""
The authentication is temporary, i.e. for one request.
"""
self.client.login(username='user1', password='user1')
r1 = self.client.get(
'/k-fet/accounts/', HTTP_KFETPASSWORD='kfet_user2')
self.assertEqual(r1.wsgi_request.user, self.user2)
r2 = self.client.get('/k-fet/accounts/')
self.assertEqual(r2.wsgi_request.user, self.user1)

34
kfet/auth/utils.py Normal file
View file

@ -0,0 +1,34 @@
import hashlib
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Permission
from kfet.models import Account
User = get_user_model()
def get_kfet_generic_user():
"""
Return the user related to the kfet generic account.
"""
return Account.objects.get_generic().user
def setup_kfet_generic_user(**kwargs):
"""
First steps of setup of the kfet generic user are done in a migration, as
it is more robust against database schema changes.
Following steps cannot be done from migration.
"""
generic = get_kfet_generic_user()
generic.user_permissions.add(
Permission.objects.get(
content_type__app_label='kfet',
codename='is_team',
)
)
def hash_password(password):
return hashlib.sha256(password.encode('utf-8')).hexdigest()

View file

@ -3,38 +3,105 @@ from django.contrib.messages.views import SuccessMessageMixin
from django.contrib.auth import authenticate, login
from django.contrib.auth.decorators import permission_required
from django.contrib.auth.models import User
from django.core.urlresolvers import reverse_lazy
from django.contrib.auth.views import redirect_to_login
from django.core.urlresolvers import reverse, reverse_lazy
from django.db.models import Prefetch
from django.shortcuts import render
from django.utils.crypto import get_random_string
from django.http import QueryDict
from django.shortcuts import redirect, render
from django.utils.decorators import method_decorator
from django.utils.translation import ugettext_lazy as _
from django.views.generic import View
from django.views.decorators.http import require_http_methods
from django.views.generic.edit import CreateView, UpdateView
from django_cas_ng.views import logout as cas_logout_view
from kfet.decorators import teamkfet_required
from .forms import GroupForm
from .models import GenericTeamToken, Group
@teamkfet_required
def login_genericteam(request):
# Check si besoin de déconnecter l'utilisateur de CAS
cas_logout = None
if request.user.profile.login_clipper:
# Récupèration de la vue de déconnexion de CAS
# Ici, car request sera modifié après
next_page = request.META.get('HTTP_REFERER', None)
cas_logout = cas_logout_view(request, next_page=next_page)
class GenericLoginView(View):
"""
View to authenticate as kfet generic user.
# Authentification du compte générique
token = GenericTeamToken.objects.create(token=get_random_string(50))
user = authenticate(username="kfet_genericteam", token=token.token)
login(request, user)
It is a 2-step view. First, issue a token if user is a team member and send
him to the logout view (for proper disconnect) with callback url to here.
Then authenticate the token to log in as the kfet generic user.
messages.success(request, "Connecté en utilisateur partagé")
Token is stored in COOKIES to avoid share it with the authentication
provider, which can be external. Session is unusable as it will be cleared
on logout.
"""
TOKEN_COOKIE_NAME = 'kfettoken'
return cas_logout or render(request, "kfet/login_genericteam.html")
@method_decorator(require_http_methods(['GET', 'POST']))
def dispatch(self, request, *args, **kwargs):
token = request.get_signed_cookie(self.TOKEN_COOKIE_NAME, None)
if not token:
if not request.user.has_perm('kfet.is_team'):
return redirect_to_login(request.get_full_path())
if request.method == 'POST':
# Step 1: set token and logout user.
return self.prepare_auth()
else:
# GET request should not change server/client states. Send a
# confirmation template to emit a POST request.
return render(request, 'kfet/confirm_form.html', {
'title': _("Ouvrir une session partagée"),
'text': _(
"Êtes-vous sûr·e de vouloir ouvrir une session "
"partagée ?"
),
})
else:
# Step 2: validate token.
return self.validate_auth(token)
def prepare_auth(self):
# Issue token.
token = GenericTeamToken.objects.create_token()
# Prepare callback of logout.
here_url = reverse(login_generic)
if 'next' in self.request.GET:
# Keep given next page.
here_qd = QueryDict(mutable=True)
here_qd['next'] = self.request.GET['next']
here_url += '?{}'.format(here_qd.urlencode())
logout_url = reverse('cof-logout')
logout_qd = QueryDict(mutable=True)
logout_qd['next'] = here_url
logout_url += '?{}'.format(logout_qd.urlencode(safe='/'))
resp = redirect(logout_url)
resp.set_signed_cookie(
self.TOKEN_COOKIE_NAME, token.token, httponly=True)
return resp
def validate_auth(self, token):
# Authenticate with GenericBackend.
user = authenticate(request=self.request, kfet_token=token)
if user:
# Log in generic user.
login(self.request, user)
messages.success(self.request, _(
"K-Fêt — Ouverture d'une session partagée."
))
resp = redirect(self.get_next_url())
else:
# Try again.
resp = redirect(self.request.get_full_path())
# Prevents blocking due to an invalid COOKIE.
resp.delete_cookie(self.TOKEN_COOKIE_NAME)
return resp
def get_next_url(self):
return self.request.GET.get('next', reverse('kfet.kpsul'))
login_generic = GenericLoginView.as_view()
@permission_required('kfet.manage_perms')

View file

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
from kfet.auth import KFET_GENERIC_TRIGRAMME, KFET_GENERIC_USERNAME
def setup_kfet_generic_user(apps, schema_editor):
"""
Setup models instances for the kfet generic account.
Username and trigramme are retrieved from kfet.auth.__init__ module.
Other data are registered here.
See also setup_kfet_generic_user from kfet.auth.utils module.
"""
User = apps.get_model('auth', 'User')
CofProfile = apps.get_model('gestioncof', 'CofProfile')
Account = apps.get_model('kfet', 'Account')
user, _ = User.objects.update_or_create(
username=KFET_GENERIC_USERNAME,
defaults={
'first_name': 'Compte générique K-Fêt',
},
)
profile, _ = CofProfile.objects.update_or_create(user=user)
account, _ = Account.objects.update_or_create(
cofprofile=profile,
defaults={
'trigramme': KFET_GENERIC_TRIGRAMME,
},
)
class Migration(migrations.Migration):
dependencies = [
('kfet', '0058_delete_genericteamtoken'),
]
operations = [
migrations.RunPython(setup_kfet_generic_user),
]

View file

@ -12,8 +12,8 @@ from django.db import transaction
from django.db.models import F
from datetime import date
import re
import hashlib
from .auth import KFET_GENERIC_TRIGRAMME
from .auth.models import GenericTeamToken, Group, Permission # noqa
from .config import kfet_config
@ -35,6 +35,23 @@ class AccountManager(models.Manager):
return super().get_queryset().select_related('cofprofile__user',
'negative')
def get_generic(self):
"""
Get the kfet generic account instance.
"""
return self.get(trigramme=KFET_GENERIC_TRIGRAMME)
def get_by_password(self, password):
"""
Get a kfet generic account by clear password.
Raises Account.DoesNotExist if no Account has this password.
"""
from .auth.utils import hash_password
if password is None:
raise self.model.DoesNotExist
return self.get(password=hash_password(password))
class Account(models.Model):
objects = AccountManager()
@ -238,10 +255,9 @@ class Account(models.Model):
self.cofprofile = cof
super(Account, self).save(*args, **kwargs)
def change_pwd(self, pwd):
pwd_sha256 = hashlib.sha256(pwd.encode('utf-8'))\
.hexdigest()
self.password = pwd_sha256
def change_pwd(self, clear_password):
from .auth.utils import hash_password
self.password = hash_password(clear_password)
# Surcharge de delete
# Pas de suppression possible

View file

@ -1,21 +0,0 @@
# -*- coding: utf-8 -*-
from django.contrib import messages
from django.contrib.auth.signals import user_logged_in
from django.core.urlresolvers import reverse
from django.dispatch import receiver
from django.utils.safestring import mark_safe
@receiver(user_logged_in)
def messages_on_login(sender, request, user, **kwargs):
if (not user.username == 'kfet_genericteam' and
user.has_perm('kfet.is_team') and
hasattr(request, 'GET') and
'k-fet' in request.GET.get('next', '')):
messages.info(request, mark_safe(
'<a href="{}" class="genericteam" target="_blank">'
' Connexion en utilisateur partagé ?'
'</a>'
.format(reverse('kfet.login.genericteam'))
))

View file

@ -1,22 +1,33 @@
$(document).ready(function() {
if (typeof Cookies !== 'undefined') {
// Retrieving csrf token
csrftoken = Cookies.get('csrftoken');
// Appending csrf token to ajax post requests
function csrfSafeMethod(method) {
// these HTTP methods do not require CSRF protection
return (/^(GET|HEAD|OPTIONS|TRACE)$/.test(method));
/**
* CSRF Token
*/
var csrftoken = '';
if (typeof Cookies !== 'undefined')
csrftoken = Cookies.get('csrftoken');
// Add CSRF token in header of AJAX requests.
function csrfSafeMethod(method) {
// these HTTP methods do not require CSRF protection
return (/^(GET|HEAD|OPTIONS|TRACE)$/.test(method));
}
$.ajaxSetup({
beforeSend: function(xhr, settings) {
if (!csrfSafeMethod(settings.type) && !this.crossDomain) {
xhr.setRequestHeader("X-CSRFToken", csrftoken);
}
$.ajaxSetup({
beforeSend: function(xhr, settings) {
if (!csrfSafeMethod(settings.type) && !this.crossDomain) {
xhr.setRequestHeader("X-CSRFToken", csrftoken);
}
}
});
}
});
function add_csrf_form($form) {
$form.append(
$('<input>', {'name': 'csrfmiddlewaretoken', 'value': csrftoken})
);
}
/*
* Generic Websocket class and k-psul ws instanciation
*/
@ -199,3 +210,28 @@ jconfirm.defaults = {
confirmButton: '<span class="glyphicon glyphicon-ok"></span>',
cancelButton: '<span class="glyphicon glyphicon-remove"></span>'
};
/**
* Create form node, given an url used as 'action', with csrftoken set.
*/
function create_form(url) {
let $form = $('<form>', {
'action': url,
'method': 'post',
});
add_csrf_form($form);
return $form;
}
/**
* Emit a POST request from <a> tag.
*
* Usage:
* <a href="#" data-url="{target url}" onclick="submit_url(this)">{}</a>
*/
function submit_url(el) {
let url = $(el).data('url');
create_form(url).appendTo($('body')).submit();
}

View file

@ -1,4 +1,4 @@
{% load static %}
{% load i18n static %}
{% load wagtailcore_tags %}
<nav class="navbar navbar-fixed-top">
@ -62,7 +62,8 @@
</ul>
<ul class="nav navbar-nav navbar-right nav-app">
{% if user.username == 'kfet_genericteam' %}
{% include "kfet/nav_item.html" with text="Équipe standard" %}
{% trans "Session partagée" as shared_str %}
{% include "kfet/nav_item.html" with text=shared_str glyphicon="sunglasses" %}
{% elif user.is_authenticated and not user.profile.account_kfet %}
{% include "kfet/nav_item.html" with class="disabled" href="#" glyphicon="user" text="Mon compte" %}
{% elif user.profile.account_kfet.readable %}
@ -87,7 +88,11 @@
<li><a href="{% url 'kfet.order' %}">Commandes</a></li>
{% if user.username != 'kfet_genericteam' %}
<li class="divider"></li>
<li><a href="{% url 'kfet.login.genericteam' %}" target="_blank" class="genericteam">Connexion standard</a></li>
<li>
<a href="#" data-url="{% url "kfet.login.generic" %}" onclick="submit_url(this)">
{% trans "Ouvrir une session partagée" %}
</a>
</li>
{% endif %}
{% if perms.kfet.change_settings %}
<li><a href="{% url 'kfet.settings' %}">Paramètres</a></li>
@ -118,13 +123,3 @@
</ul>
</div>
</nav>
<script type="text/javascript">
$(document).ready(function () {
$('.genericteam').on('click', function () {
setTimeout(function () { location.reload() }, 1000);
});
});
</script>

View file

@ -0,0 +1,20 @@
{% extends "kfet/base_form.html" %}
{% load i18n %}
{% block title %}{{ title }}{% endblock %}
{% block header %}{% endblock %}
{% block main-class %}main-bg main-padding text-center{% endblock %}
{% block main-size %}col-sm-8 col-sm-offset-2 col-md-6 col-md-offset-3{% endblock %}
{% block main %}
<form action="{{ confirm_url }}" method="post">
<p>
{{ text }}
</p>
<button type="submit" class="btn btn-primary">{% trans "Confirmer" %}</button>
{% csrf_token %}
</form>
{% endblock %}

View file

@ -4,6 +4,8 @@
<li class="{% if not href %}navbar-text{% endif %} {% if request.path == href %}active{% endif %} {{ class }}">
{% if href %}
<a href="{{ href }}" title="{{ text }}">
{% else %}
<span title="{{ text }}">
{% endif %}<!--
{% if glyphicon %}
--><span class="glyphicon glyphicon-{{ glyphicon }}"></span><!--
@ -14,5 +16,7 @@
-->
{% if href %}
</a>
{% else %}
</span>
{% endif %}
</li>

25
kfet/tests/test_models.py Normal file
View file

@ -0,0 +1,25 @@
from django.contrib.auth import get_user_model
from django.test import TestCase
from kfet.models import Account
User = get_user_model()
class AccountTests(TestCase):
def setUp(self):
self.account = Account(trigramme='000')
self.account.save({'username': 'user'})
def test_password(self):
self.account.change_pwd('anna')
self.account.save()
self.assertEqual(Account.objects.get_by_password('anna'), self.account)
with self.assertRaises(Account.DoesNotExist):
Account.objects.get_by_password(None)
with self.assertRaises(Account.DoesNotExist):
Account.objects.get_by_password('bernard')

View file

@ -8,8 +8,8 @@ from kfet.decorators import teamkfet_required
urlpatterns = [
url(r'^login/genericteam$', views.login_genericteam,
name='kfet.login.genericteam'),
url(r'^login/generic$', views.login_generic,
name='kfet.login.generic'),
url(r'^history$', views.history,
name='kfet.history'),

View file

@ -51,7 +51,7 @@ import statistics
from kfet.statistic import ScaleMixin, last_stats_manifest, tot_ventes, WeekScale
from .auth.views import ( # noqa
account_group, login_genericteam, AccountGroupCreate, AccountGroupUpdate,
account_group, login_generic, AccountGroupCreate, AccountGroupUpdate,
)