Merge branch 'aureplop/kfet-auth_backends'

This commit is contained in:
Martin Pépin 2017-10-11 22:59:28 +02:00
commit 3b1d8487e2
33 changed files with 1001 additions and 336 deletions

View file

@ -90,6 +90,7 @@ INSTALLED_APPS = [
'wagtailmenus',
'modelcluster',
'taggit',
'kfet.auth',
'kfet.cms',
]
@ -99,7 +100,7 @@ MIDDLEWARE_CLASSES = [
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.auth.middleware.SessionAuthenticationMiddleware',
'kfet.middleware.KFetAuthenticationMiddleware',
'kfet.auth.middleware.TemporaryAuthMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
'django.middleware.security.SecurityMiddleware',
@ -127,7 +128,7 @@ TEMPLATES = [
'wagtailmenus.context_processors.wagtailmenus',
'djconfig.context_processors.config',
'gestioncof.shared.context_processor',
'kfet.context_processors.auth',
'kfet.auth.context_processors.temporary_auth',
'kfet.context_processors.config',
],
},
@ -190,7 +191,7 @@ CAS_EMAIL_FORMAT = "%s@clipper.ens.fr"
AUTHENTICATION_BACKENDS = (
'django.contrib.auth.backends.ModelBackend',
'gestioncof.shared.COFCASBackend',
'kfet.backends.GenericTeamBackend',
'kfet.auth.backends.GenericBackend',
)
RECAPTCHA_USE_SSL = True

View file

@ -11,7 +11,6 @@ class KFetConfig(AppConfig):
verbose_name = "Application K-Fêt"
def ready(self):
import kfet.signals
self.register_config()
def register_config(self):

4
kfet/auth/__init__.py Normal file
View file

@ -0,0 +1,4 @@
default_app_config = 'kfet.auth.apps.KFetAuthConfig'
KFET_GENERIC_USERNAME = 'kfet_genericteam'
KFET_GENERIC_TRIGRAMME = 'GNR'

14
kfet/auth/apps.py Normal file
View file

@ -0,0 +1,14 @@
from django.apps import AppConfig
from django.db.models.signals import post_migrate
from django.utils.translation import ugettext_lazy as _
class KFetAuthConfig(AppConfig):
name = 'kfet.auth'
label = 'kfetauth'
verbose_name = _("K-Fêt - Authentification et Autorisation")
def ready(self):
from . import signals # noqa
from .utils import setup_kfet_generic_user
post_migrate.connect(setup_kfet_generic_user, sender=self)

43
kfet/auth/backends.py Normal file
View file

@ -0,0 +1,43 @@
# -*- coding: utf-8 -*-
from django.contrib.auth import get_user_model
from kfet.models import Account, GenericTeamToken
from .utils import get_kfet_generic_user
User = get_user_model()
class BaseKFetBackend:
def get_user(self, user_id):
"""
Add extra select related up to Account.
"""
try:
return (
User.objects
.select_related('profile__account_kfet')
.get(pk=user_id)
)
except User.DoesNotExist:
return None
class AccountBackend(BaseKFetBackend):
def authenticate(self, request, kfet_password=None):
try:
return Account.objects.get_by_password(kfet_password).user
except Account.DoesNotExist:
return None
class GenericBackend(BaseKFetBackend):
def authenticate(self, request, kfet_token=None):
try:
team_token = GenericTeamToken.objects.get(token=kfet_token)
except GenericTeamToken.DoesNotExist:
return
# No need to keep the token.
team_token.delete()
return get_kfet_generic_user()

View file

@ -0,0 +1,10 @@
from django.contrib.auth.context_processors import PermWrapper
def temporary_auth(request):
if hasattr(request, 'real_user'):
return {
'user': request.real_user,
'perms': PermWrapper(request.real_user),
}
return {}

20
kfet/auth/fields.py Normal file
View file

@ -0,0 +1,20 @@
from django import forms
from django.contrib.auth.models import Permission
from django.contrib.contenttypes.models import ContentType
from django.forms import widgets
class KFetPermissionsField(forms.ModelMultipleChoiceField):
def __init__(self, *args, **kwargs):
queryset = Permission.objects.filter(
content_type__in=ContentType.objects.filter(app_label="kfet"),
)
super().__init__(
queryset=queryset,
widget=widgets.CheckboxSelectMultiple,
*args, **kwargs
)
def label_from_instance(self, obj):
return obj.name

44
kfet/auth/forms.py Normal file
View file

@ -0,0 +1,44 @@
from django import forms
from django.contrib.auth.models import Group, User
from .fields import KFetPermissionsField
class GroupForm(forms.ModelForm):
permissions = KFetPermissionsField()
def clean_name(self):
name = self.cleaned_data['name']
return 'K-Fêt %s' % name
def clean_permissions(self):
kfet_perms = self.cleaned_data['permissions']
# TODO: With Django >=1.11, the QuerySet method 'difference' can be
# used.
# other_groups = self.instance.permissions.difference(
# self.fields['permissions'].queryset
# )
other_perms = self.instance.permissions.exclude(
pk__in=[p.pk for p in self.fields['permissions'].queryset],
)
return list(kfet_perms) + list(other_perms)
class Meta:
model = Group
fields = ['name', 'permissions']
class UserGroupForm(forms.ModelForm):
groups = forms.ModelMultipleChoiceField(
Group.objects.filter(name__icontains='K-Fêt'),
label='Statut équipe',
required=False)
def clean_groups(self):
kfet_groups = self.cleaned_data.get('groups')
other_groups = self.instance.groups.exclude(name__icontains='K-Fêt')
return list(kfet_groups) + list(other_groups)
class Meta:
model = User
fields = ['groups']

38
kfet/auth/middleware.py Normal file
View file

@ -0,0 +1,38 @@
# -*- coding: utf-8 -*-
from django.contrib.auth import get_user_model
from .backends import AccountBackend
User = get_user_model()
class TemporaryAuthMiddleware:
"""Authenticate another user for this request if AccountBackend succeeds.
By the way, if a user is authenticated, we refresh its from db to add
values from CofProfile and Account of this user.
"""
def process_request(self, request):
if request.user.is_authenticated():
# avoid multiple db accesses in views and templates
request.user = (
User.objects
.select_related('profile__account_kfet')
.get(pk=request.user.pk)
)
temp_request_user = AccountBackend().authenticate(
request,
kfet_password=self.get_kfet_password(request),
)
if temp_request_user:
request.real_user = request.user
request.user = temp_request_user
def get_kfet_password(self, request):
return (
request.META.get('HTTP_KFETPASSWORD') or
request.POST.get('KFETPASSWORD')
)

View file

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('auth', '0006_require_contenttypes_0002'),
# Following dependency allows using Account model to set up the kfet
# generic user in post_migrate receiver.
('kfet', '0058_delete_genericteamtoken'),
]
operations = [
migrations.CreateModel(
name='GenericTeamToken',
fields=[
('id', models.AutoField(verbose_name='ID', auto_created=True, serialize=False, primary_key=True)),
('token', models.CharField(unique=True, max_length=50)),
],
),
]

View file

17
kfet/auth/models.py Normal file
View file

@ -0,0 +1,17 @@
from django.db import models
from django.utils.crypto import get_random_string
class GenericTeamTokenManager(models.Manager):
def create_token(self):
token = get_random_string(50)
while self.filter(token=token).exists():
token = get_random_string(50)
return self.create(token=token)
class GenericTeamToken(models.Model):
token = models.CharField(max_length=50, unique=True)
objects = GenericTeamTokenManager()

40
kfet/auth/signals.py Normal file
View file

@ -0,0 +1,40 @@
from django.contrib import messages
from django.contrib.auth.signals import user_logged_in
from django.core.urlresolvers import reverse
from django.dispatch import receiver
from django.utils.safestring import mark_safe
from django.utils.translation import ugettext as _
from .utils import get_kfet_generic_user
@receiver(user_logged_in)
def suggest_auth_generic(sender, request, user, **kwargs):
"""
Suggest logged in user to continue as the kfet generic user.
Message is only added if the following conditions are met:
- the next page (where user is going to be redirected due to successful
authentication) is related to kfet, i.e. 'k-fet' is in its url.
- logged in user is a kfet staff member (except the generic user).
"""
# Filter against the next page.
if not(hasattr(request, 'GET') and 'next' in request.GET):
return
next_page = request.GET['next']
generic_url = reverse('kfet.login.generic')
if not('k-fet' in next_page and not next_page.startswith(generic_url)):
return
# Filter against the logged in user.
if not(user.has_perm('kfet.is_team') and user != get_kfet_generic_user()):
return
# Seems legit to add message.
text = _("K-Fêt — Ouvrir une session partagée ?")
messages.info(request, mark_safe(
'<a href="#" data-url="{}" onclick="submit_url(this)">{}</a>'
.format(generic_url, text)
))

367
kfet/auth/tests.py Normal file
View file

@ -0,0 +1,367 @@
# -*- coding: utf-8 -*-
from unittest import mock
from django.core import signing
from django.core.urlresolvers import reverse
from django.contrib.auth.models import AnonymousUser, Group, Permission, User
from django.test import RequestFactory, TestCase
from kfet.forms import UserGroupForm
from kfet.models import Account
from . import KFET_GENERIC_TRIGRAMME, KFET_GENERIC_USERNAME
from .backends import AccountBackend, GenericBackend
from .middleware import TemporaryAuthMiddleware
from .models import GenericTeamToken
from .utils import get_kfet_generic_user
from .views import GenericLoginView
##
# Forms
##
class UserGroupFormTests(TestCase):
"""Test suite for UserGroupForm."""
def setUp(self):
# create user
self.user = User.objects.create(username="foo", password="foo")
# create some K-Fêt groups
prefix_name = "K-Fêt "
names = ["Group 1", "Group 2", "Group 3"]
self.kfet_groups = [
Group.objects.create(name=prefix_name+name)
for name in names
]
# create a non-K-Fêt group
self.other_group = Group.objects.create(name="Other group")
def test_choices(self):
"""Only K-Fêt groups are selectable."""
form = UserGroupForm(instance=self.user)
groups_field = form.fields['groups']
self.assertQuerysetEqual(
groups_field.queryset,
[repr(g) for g in self.kfet_groups],
ordered=False,
)
def test_keep_others(self):
"""User stays in its non-K-Fêt groups."""
user = self.user
# add user to a non-K-Fêt group
user.groups.add(self.other_group)
# add user to some K-Fêt groups through UserGroupForm
data = {
'groups': [group.pk for group in self.kfet_groups],
}
form = UserGroupForm(data, instance=user)
form.is_valid()
form.save()
self.assertQuerysetEqual(
user.groups.all(),
[repr(g) for g in [self.other_group] + self.kfet_groups],
ordered=False,
)
class KFetGenericUserTests(TestCase):
def test_exists(self):
"""
The account is set up when app is ready, so it should exist.
"""
generic = Account.objects.get_generic()
self.assertEqual(generic.trigramme, KFET_GENERIC_TRIGRAMME)
self.assertEqual(generic.user.username, KFET_GENERIC_USERNAME)
self.assertEqual(get_kfet_generic_user(), generic.user)
##
# Backends
##
class AccountBackendTests(TestCase):
def setUp(self):
self.request = RequestFactory().get('/')
def test_valid(self):
acc = Account(trigramme='000')
acc.change_pwd('valid')
acc.save({'username': 'user'})
auth = AccountBackend().authenticate(
self.request, kfet_password='valid')
self.assertEqual(auth, acc.user)
def test_invalid(self):
auth = AccountBackend().authenticate(
self.request, kfet_password='invalid')
self.assertIsNone(auth)
class GenericBackendTests(TestCase):
def setUp(self):
self.request = RequestFactory().get('/')
def test_valid(self):
token = GenericTeamToken.objects.create_token()
auth = GenericBackend().authenticate(
self.request, kfet_token=token.token)
self.assertEqual(auth, get_kfet_generic_user())
self.assertEqual(GenericTeamToken.objects.all().count(), 0)
def test_invalid(self):
auth = GenericBackend().authenticate(
self.request, kfet_token='invalid')
self.assertIsNone(auth)
##
# Views
##
class GenericLoginViewTests(TestCase):
def setUp(self):
patcher_messages = mock.patch('gestioncof.signals.messages')
patcher_messages.start()
self.addCleanup(patcher_messages.stop)
user_acc = Account(trigramme='000')
user_acc.save({'username': 'user'})
self.user = user_acc.user
self.user.set_password('user')
self.user.save()
team_acc = Account(trigramme='100')
team_acc.save({'username': 'team'})
self.team = team_acc.user
self.team.set_password('team')
self.team.save()
self.team.user_permissions.add(
Permission.objects.get(
content_type__app_label='kfet', codename='is_team'),
)
self.url = reverse('kfet.login.generic')
self.generic_user = get_kfet_generic_user()
def test_url(self):
self.assertEqual(self.url, '/k-fet/login/generic')
def test_notoken_get(self):
"""
Send confirmation for user to emit POST request, instead of GET.
"""
self.client.login(username='team', password='team')
r = self.client.get(self.url)
self.assertEqual(r.status_code, 200)
self.assertTemplateUsed(r, 'kfet/confirm_form.html')
def test_notoken_post(self):
"""
POST request without token in COOKIES sets a token and redirects to
logout url.
"""
self.client.login(username='team', password='team')
r = self.client.post(self.url)
self.assertRedirects(
r, '/logout?next={}'.format(self.url),
fetch_redirect_response=False,
)
def test_notoken_not_team(self):
"""
Logged in user must be a team user to initiate login as generic user.
"""
self.client.login(username='user', password='user')
# With GET.
r = self.client.get(self.url)
self.assertRedirects(
r, '/login?next={}'.format(self.url),
fetch_redirect_response=False,
)
# Also with POST.
r = self.client.post(self.url)
self.assertRedirects(
r, '/login?next={}'.format(self.url),
fetch_redirect_response=False,
)
def _set_signed_cookie(self, client, key, value):
signed_value = signing.get_cookie_signer(salt=key).sign(value)
client.cookies.load({key: signed_value})
def _is_cookie_deleted(self, client, key):
try:
self.assertNotIn(key, client.cookies)
except AssertionError:
try:
cookie = client.cookies[key]
# It also can be emptied.
self.assertEqual(cookie.value, '')
self.assertEqual(
cookie['expires'], 'Thu, 01-Jan-1970 00:00:00 GMT')
self.assertEqual(cookie['max-age'], 0)
except AssertionError:
raise AssertionError("The cookie '%s' still exists." % key)
def test_withtoken_valid(self):
"""
The kfet generic user is logged in.
"""
token = GenericTeamToken.objects.create(token='valid')
self._set_signed_cookie(
self.client, GenericLoginView.TOKEN_COOKIE_NAME, 'valid')
r = self.client.get(self.url)
self.assertRedirects(r, reverse('kfet.kpsul'))
self.assertEqual(r.wsgi_request.user, self.generic_user)
self._is_cookie_deleted(
self.client, GenericLoginView.TOKEN_COOKIE_NAME)
with self.assertRaises(GenericTeamToken.DoesNotExist):
token.refresh_from_db()
def test_withtoken_invalid(self):
"""
If token is invalid, delete it and try again.
"""
self._set_signed_cookie(
self.client, GenericLoginView.TOKEN_COOKIE_NAME, 'invalid')
r = self.client.get(self.url)
self.assertRedirects(r, self.url, fetch_redirect_response=False)
self.assertEqual(r.wsgi_request.user, AnonymousUser())
self._is_cookie_deleted(
self.client, GenericLoginView.TOKEN_COOKIE_NAME)
def test_flow_ok(self):
"""
A team user is logged in as the kfet generic user.
"""
self.client.login(username='team', password='team')
next_url = '/k-fet/'
r = self.client.post(
'{}?next={}'.format(self.url, next_url), follow=True)
self.assertEqual(r.wsgi_request.user, self.generic_user)
self.assertEqual(r.wsgi_request.path, '/k-fet/')
##
# Temporary authentication
#
# Includes:
# - TemporaryAuthMiddleware
# - temporary_auth context processor
##
class TemporaryAuthTests(TestCase):
def setUp(self):
patcher_messages = mock.patch('gestioncof.signals.messages')
patcher_messages.start()
self.addCleanup(patcher_messages.stop)
self.factory = RequestFactory()
user1_acc = Account(trigramme='000')
user1_acc.change_pwd('kfet_user1')
user1_acc.save({'username': 'user1'})
self.user1 = user1_acc.user
self.user1.set_password('user1')
self.user1.save()
user2_acc = Account(trigramme='100')
user2_acc.change_pwd('kfet_user2')
user2_acc.save({'username': 'user2'})
self.user2 = user2_acc.user
self.user2.set_password('user2')
self.user2.save()
self.perm = Permission.objects.get(
content_type__app_label='kfet', codename='is_team')
self.user2.user_permissions.add(self.perm)
def test_middleware_header(self):
"""
A user can be authenticated if ``HTTP_KFETPASSWORD`` header of a
request contains a valid kfet password.
"""
request = self.factory.get('/', HTTP_KFETPASSWORD='kfet_user2')
request.user = self.user1
TemporaryAuthMiddleware().process_request(request)
self.assertEqual(request.user, self.user2)
self.assertEqual(request.real_user, self.user1)
def test_middleware_post(self):
"""
A user can be authenticated if ``KFETPASSWORD`` of POST data contains
a valid kfet password.
"""
request = self.factory.post('/', {'KFETPASSWORD': 'kfet_user2'})
request.user = self.user1
TemporaryAuthMiddleware().process_request(request)
self.assertEqual(request.user, self.user2)
self.assertEqual(request.real_user, self.user1)
def test_middleware_invalid(self):
"""
The given password must be a password of an Account.
"""
request = self.factory.post('/', {'KFETPASSWORD': 'invalid'})
request.user = self.user1
TemporaryAuthMiddleware().process_request(request)
self.assertEqual(request.user, self.user1)
self.assertFalse(hasattr(request, 'real_user'))
def test_context_processor(self):
"""
Context variables give the real authenticated user and his permissions.
"""
self.client.login(username='user1', password='user1')
r = self.client.get('/k-fet/accounts/', HTTP_KFETPASSWORD='kfet_user2')
self.assertEqual(r.context['user'], self.user1)
self.assertNotIn('kfet.is_team', r.context['perms'])
def test_auth_not_persistent(self):
"""
The authentication is temporary, i.e. for one request.
"""
self.client.login(username='user1', password='user1')
r1 = self.client.get(
'/k-fet/accounts/', HTTP_KFETPASSWORD='kfet_user2')
self.assertEqual(r1.wsgi_request.user, self.user2)
r2 = self.client.get('/k-fet/accounts/')
self.assertEqual(r2.wsgi_request.user, self.user1)

34
kfet/auth/utils.py Normal file
View file

@ -0,0 +1,34 @@
import hashlib
from django.contrib.auth import get_user_model
from django.contrib.auth.models import Permission
from kfet.models import Account
User = get_user_model()
def get_kfet_generic_user():
"""
Return the user related to the kfet generic account.
"""
return Account.objects.get_generic().user
def setup_kfet_generic_user(**kwargs):
"""
First steps of setup of the kfet generic user are done in a migration, as
it is more robust against database schema changes.
Following steps cannot be done from migration.
"""
generic = get_kfet_generic_user()
generic.user_permissions.add(
Permission.objects.get(
content_type__app_label='kfet',
codename='is_team',
)
)
def hash_password(password):
return hashlib.sha256(password.encode('utf-8')).hexdigest()

136
kfet/auth/views.py Normal file
View file

@ -0,0 +1,136 @@
from django.contrib import messages
from django.contrib.messages.views import SuccessMessageMixin
from django.contrib.auth import authenticate, login
from django.contrib.auth.decorators import permission_required
from django.contrib.auth.models import Group, User
from django.contrib.auth.views import redirect_to_login
from django.core.urlresolvers import reverse, reverse_lazy
from django.db.models import Prefetch
from django.http import QueryDict
from django.shortcuts import redirect, render
from django.utils.decorators import method_decorator
from django.utils.translation import ugettext_lazy as _
from django.views.generic import View
from django.views.decorators.http import require_http_methods
from django.views.generic.edit import CreateView, UpdateView
from .forms import GroupForm
from .models import GenericTeamToken
class GenericLoginView(View):
"""
View to authenticate as kfet generic user.
It is a 2-step view. First, issue a token if user is a team member and send
him to the logout view (for proper disconnect) with callback url to here.
Then authenticate the token to log in as the kfet generic user.
Token is stored in COOKIES to avoid share it with the authentication
provider, which can be external. Session is unusable as it will be cleared
on logout.
"""
TOKEN_COOKIE_NAME = 'kfettoken'
@method_decorator(require_http_methods(['GET', 'POST']))
def dispatch(self, request, *args, **kwargs):
token = request.get_signed_cookie(self.TOKEN_COOKIE_NAME, None)
if not token:
if not request.user.has_perm('kfet.is_team'):
return redirect_to_login(request.get_full_path())
if request.method == 'POST':
# Step 1: set token and logout user.
return self.prepare_auth()
else:
# GET request should not change server/client states. Send a
# confirmation template to emit a POST request.
return render(request, 'kfet/confirm_form.html', {
'title': _("Ouvrir une session partagée"),
'text': _(
"Êtes-vous sûr·e de vouloir ouvrir une session "
"partagée ?"
),
})
else:
# Step 2: validate token.
return self.validate_auth(token)
def prepare_auth(self):
# Issue token.
token = GenericTeamToken.objects.create_token()
# Prepare callback of logout.
here_url = reverse(login_generic)
if 'next' in self.request.GET:
# Keep given next page.
here_qd = QueryDict(mutable=True)
here_qd['next'] = self.request.GET['next']
here_url += '?{}'.format(here_qd.urlencode())
logout_url = reverse('cof-logout')
logout_qd = QueryDict(mutable=True)
logout_qd['next'] = here_url
logout_url += '?{}'.format(logout_qd.urlencode(safe='/'))
resp = redirect(logout_url)
resp.set_signed_cookie(
self.TOKEN_COOKIE_NAME, token.token, httponly=True)
return resp
def validate_auth(self, token):
# Authenticate with GenericBackend.
user = authenticate(request=self.request, kfet_token=token)
if user:
# Log in generic user.
login(self.request, user)
messages.success(self.request, _(
"K-Fêt — Ouverture d'une session partagée."
))
resp = redirect(self.get_next_url())
else:
# Try again.
resp = redirect(self.request.get_full_path())
# Prevents blocking due to an invalid COOKIE.
resp.delete_cookie(self.TOKEN_COOKIE_NAME)
return resp
def get_next_url(self):
return self.request.GET.get('next', reverse('kfet.kpsul'))
login_generic = GenericLoginView.as_view()
@permission_required('kfet.manage_perms')
def account_group(request):
user_pre = Prefetch(
'user_set',
queryset=User.objects.select_related('profile__account_kfet'),
)
groups = (
Group.objects
.filter(name__icontains='K-Fêt')
.prefetch_related('permissions', user_pre)
)
return render(request, 'kfet/account_group.html', {
'groups': groups,
})
class AccountGroupCreate(SuccessMessageMixin, CreateView):
model = Group
template_name = 'kfet/account_group_form.html'
form_class = GroupForm
success_message = 'Nouveau groupe : %(name)s'
success_url = reverse_lazy('kfet.account.group')
class AccountGroupUpdate(SuccessMessageMixin, UpdateView):
queryset = Group.objects.filter(name__icontains='K-Fêt')
template_name = 'kfet/account_group_form.html'
form_class = GroupForm
success_message = 'Groupe modifié : %(name)s'
success_url = reverse_lazy('kfet.account.group')

View file

@ -1,54 +0,0 @@
# -*- coding: utf-8 -*-
import hashlib
from django.contrib.auth.models import User, Permission
from gestioncof.models import CofProfile
from kfet.models import Account, GenericTeamToken
class KFetBackend(object):
def authenticate(self, request):
password = request.POST.get('KFETPASSWORD', '')
password = request.META.get('HTTP_KFETPASSWORD', password)
if not password:
return None
try:
password_sha256 = (
hashlib.sha256(password.encode('utf-8'))
.hexdigest()
)
account = Account.objects.get(password=password_sha256)
return account.cofprofile.user
except Account.DoesNotExist:
return None
class GenericTeamBackend(object):
def authenticate(self, username=None, token=None):
valid_token = GenericTeamToken.objects.get(token=token)
if username == 'kfet_genericteam' and valid_token:
# Création du user s'il n'existe pas déjà
user, _ = User.objects.get_or_create(username='kfet_genericteam')
profile, _ = CofProfile.objects.get_or_create(user=user)
account, _ = Account.objects.get_or_create(
cofprofile=profile,
trigramme='GNR')
# Ajoute la permission kfet.is_team à ce user
perm_is_team = Permission.objects.get(codename='is_team')
user.user_permissions.add(perm_is_team)
return user
return None
def get_user(self, user_id):
try:
return (
User.objects
.select_related('profile__account_kfet')
.get(pk=user_id)
)
except User.DoesNotExist:
return None

View file

@ -1,18 +1,7 @@
# -*- coding: utf-8 -*-
from django.contrib.auth.context_processors import PermWrapper
from kfet.config import kfet_config
def auth(request):
if hasattr(request, 'real_user'):
return {
'user': request.real_user,
'perms': PermWrapper(request.real_user),
}
return {}
def config(request):
return {'kfet_config': kfet_config}

View file

@ -5,9 +5,8 @@ from decimal import Decimal
from django import forms
from django.core.exceptions import ValidationError
from django.contrib.auth.models import User, Group, Permission
from django.contrib.contenttypes.models import ContentType
from django.forms import modelformset_factory, widgets
from django.contrib.auth.models import User
from django.forms import modelformset_factory
from django.utils import timezone
from djconfig.forms import ConfigForm
@ -18,6 +17,8 @@ from kfet.models import (
TransferGroup, Supplier)
from gestioncof.models import CofProfile
from .auth.forms import UserGroupForm # noqa
# -----
# Widgets
@ -128,60 +129,6 @@ class UserRestrictTeamForm(UserForm):
fields = ['first_name', 'last_name', 'email']
class UserGroupForm(forms.ModelForm):
groups = forms.ModelMultipleChoiceField(
Group.objects.filter(name__icontains='K-Fêt'),
label='Statut équipe',
required=False)
def clean_groups(self):
kfet_groups = self.cleaned_data.get('groups')
other_groups = self.instance.groups.exclude(name__icontains='K-Fêt')
return list(kfet_groups) + list(other_groups)
class Meta:
model = User
fields = ['groups']
class KFetPermissionsField(forms.ModelMultipleChoiceField):
def __init__(self, *args, **kwargs):
queryset = Permission.objects.filter(
content_type__in=ContentType.objects.filter(app_label="kfet"),
)
super().__init__(
queryset=queryset,
widget=widgets.CheckboxSelectMultiple,
*args, **kwargs
)
def label_from_instance(self, obj):
return obj.name
class GroupForm(forms.ModelForm):
permissions = KFetPermissionsField()
def clean_name(self):
name = self.cleaned_data['name']
return 'K-Fêt %s' % name
def clean_permissions(self):
kfet_perms = self.cleaned_data['permissions']
# TODO: With Django >=1.11, the QuerySet method 'difference' can be used.
# other_groups = self.instance.permissions.difference(
# self.fields['permissions'].queryset
# )
other_perms = self.instance.permissions.exclude(
pk__in=[p.pk for p in self.fields['permissions'].queryset],
)
return list(kfet_perms) + list(other_perms)
class Meta:
model = Group
fields = ['name', 'permissions']
class AccountNegativeForm(forms.ModelForm):
class Meta:

View file

@ -1,29 +0,0 @@
# -*- coding: utf-8 -*-
from django.contrib.auth.models import User
from kfet.backends import KFetBackend
class KFetAuthenticationMiddleware(object):
"""Authenticate another user for this request if KFetBackend succeeds.
By the way, if a user is authenticated, we refresh its from db to add
values from CofProfile and Account of this user.
"""
def process_request(self, request):
if request.user.is_authenticated():
# avoid multiple db accesses in views and templates
user_pk = request.user.pk
request.user = (
User.objects
.select_related('profile__account_kfet')
.get(pk=user_pk)
)
kfet_backend = KFetBackend()
temp_request_user = kfet_backend.authenticate(request)
if temp_request_user:
request.real_user = request.user
request.user = temp_request_user

View file

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('kfet', '0057_merge'),
]
operations = [
migrations.DeleteModel(
name='GenericTeamToken',
),
]

View file

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
from kfet.auth import KFET_GENERIC_TRIGRAMME, KFET_GENERIC_USERNAME
def setup_kfet_generic_user(apps, schema_editor):
"""
Setup models instances for the kfet generic account.
Username and trigramme are retrieved from kfet.auth.__init__ module.
Other data are registered here.
See also setup_kfet_generic_user from kfet.auth.utils module.
"""
User = apps.get_model('auth', 'User')
CofProfile = apps.get_model('gestioncof', 'CofProfile')
Account = apps.get_model('kfet', 'Account')
user, _ = User.objects.update_or_create(
username=KFET_GENERIC_USERNAME,
defaults={
'first_name': 'Compte générique K-Fêt',
},
)
profile, _ = CofProfile.objects.update_or_create(user=user)
account, _ = Account.objects.update_or_create(
cofprofile=profile,
defaults={
'trigramme': KFET_GENERIC_TRIGRAMME,
},
)
class Migration(migrations.Migration):
dependencies = [
('kfet', '0058_delete_genericteamtoken'),
]
operations = [
migrations.RunPython(setup_kfet_generic_user),
]

View file

@ -12,7 +12,9 @@ from django.db import transaction
from django.db.models import F
from datetime import date
import re
import hashlib
from .auth import KFET_GENERIC_TRIGRAMME
from .auth.models import GenericTeamToken # noqa
from .config import kfet_config
from .utils import to_ukf
@ -33,6 +35,23 @@ class AccountManager(models.Manager):
return super().get_queryset().select_related('cofprofile__user',
'negative')
def get_generic(self):
"""
Get the kfet generic account instance.
"""
return self.get(trigramme=KFET_GENERIC_TRIGRAMME)
def get_by_password(self, password):
"""
Get a kfet generic account by clear password.
Raises Account.DoesNotExist if no Account has this password.
"""
from .auth.utils import hash_password
if password is None:
raise self.model.DoesNotExist
return self.get(password=hash_password(password))
class Account(models.Model):
objects = AccountManager()
@ -236,10 +255,9 @@ class Account(models.Model):
self.cofprofile = cof
super(Account, self).save(*args, **kwargs)
def change_pwd(self, pwd):
pwd_sha256 = hashlib.sha256(pwd.encode('utf-8'))\
.hexdigest()
self.password = pwd_sha256
def change_pwd(self, clear_password):
from .auth.utils import hash_password
self.password = hash_password(clear_password)
# Surcharge de delete
# Pas de suppression possible
@ -710,7 +728,3 @@ class Operation(models.Model):
return templates[self.type].format(nb=self.article_nb,
article=self.article,
amount=self.amount)
class GenericTeamToken(models.Model):
token = models.CharField(max_length = 50, unique = True)

View file

@ -1,21 +0,0 @@
# -*- coding: utf-8 -*-
from django.contrib import messages
from django.contrib.auth.signals import user_logged_in
from django.core.urlresolvers import reverse
from django.dispatch import receiver
from django.utils.safestring import mark_safe
@receiver(user_logged_in)
def messages_on_login(sender, request, user, **kwargs):
if (not user.username == 'kfet_genericteam' and
user.has_perm('kfet.is_team') and
hasattr(request, 'GET') and
'k-fet' in request.GET.get('next', '')):
messages.info(request, mark_safe(
'<a href="{}" class="genericteam" target="_blank">'
' Connexion en utilisateur partagé ?'
'</a>'
.format(reverse('kfet.login.genericteam'))
))

View file

@ -1,22 +1,33 @@
$(document).ready(function() {
if (typeof Cookies !== 'undefined') {
// Retrieving csrf token
csrftoken = Cookies.get('csrftoken');
// Appending csrf token to ajax post requests
function csrfSafeMethod(method) {
// these HTTP methods do not require CSRF protection
return (/^(GET|HEAD|OPTIONS|TRACE)$/.test(method));
/**
* CSRF Token
*/
var csrftoken = '';
if (typeof Cookies !== 'undefined')
csrftoken = Cookies.get('csrftoken');
// Add CSRF token in header of AJAX requests.
function csrfSafeMethod(method) {
// these HTTP methods do not require CSRF protection
return (/^(GET|HEAD|OPTIONS|TRACE)$/.test(method));
}
$.ajaxSetup({
beforeSend: function(xhr, settings) {
if (!csrfSafeMethod(settings.type) && !this.crossDomain) {
xhr.setRequestHeader("X-CSRFToken", csrftoken);
}
$.ajaxSetup({
beforeSend: function(xhr, settings) {
if (!csrfSafeMethod(settings.type) && !this.crossDomain) {
xhr.setRequestHeader("X-CSRFToken", csrftoken);
}
}
});
}
});
function add_csrf_form($form) {
$form.append(
$('<input>', {'name': 'csrfmiddlewaretoken', 'value': csrftoken})
);
}
/*
* Generic Websocket class and k-psul ws instanciation
*/
@ -199,3 +210,28 @@ jconfirm.defaults = {
confirmButton: '<span class="glyphicon glyphicon-ok"></span>',
cancelButton: '<span class="glyphicon glyphicon-remove"></span>'
};
/**
* Create form node, given an url used as 'action', with csrftoken set.
*/
function create_form(url) {
let $form = $('<form>', {
'action': url,
'method': 'post',
});
add_csrf_form($form);
return $form;
}
/**
* Emit a POST request from <a> tag.
*
* Usage:
* <a href="#" data-url="{target url}" onclick="submit_url(this)">{}</a>
*/
function submit_url(el) {
let url = $(el).data('url');
create_form(url).appendTo($('body')).submit();
}

View file

@ -1,4 +1,4 @@
{% load static %}
{% load i18n static %}
{% load wagtailcore_tags %}
<nav class="navbar navbar-fixed-top">
@ -62,7 +62,8 @@
</ul>
<ul class="nav navbar-nav navbar-right nav-app">
{% if user.username == 'kfet_genericteam' %}
{% include "kfet/nav_item.html" with text="Équipe standard" %}
{% trans "Session partagée" as shared_str %}
{% include "kfet/nav_item.html" with text=shared_str glyphicon="sunglasses" %}
{% elif user.is_authenticated and not user.profile.account_kfet %}
{% include "kfet/nav_item.html" with class="disabled" href="#" glyphicon="user" text="Mon compte" %}
{% elif user.profile.account_kfet.readable %}
@ -87,7 +88,11 @@
<li><a href="{% url 'kfet.order' %}">Commandes</a></li>
{% if user.username != 'kfet_genericteam' %}
<li class="divider"></li>
<li><a href="{% url 'kfet.login.genericteam' %}" target="_blank" class="genericteam">Connexion standard</a></li>
<li>
<a href="#" data-url="{% url "kfet.login.generic" %}" onclick="submit_url(this)">
{% trans "Ouvrir une session partagée" %}
</a>
</li>
{% endif %}
{% if perms.kfet.change_settings %}
<li><a href="{% url 'kfet.settings' %}">Paramètres</a></li>
@ -118,13 +123,3 @@
</ul>
</div>
</nav>
<script type="text/javascript">
$(document).ready(function () {
$('.genericteam').on('click', function () {
setTimeout(function () { location.reload() }, 1000);
});
});
</script>

View file

@ -0,0 +1,20 @@
{% extends "kfet/base_form.html" %}
{% load i18n %}
{% block title %}{{ title }}{% endblock %}
{% block header %}{% endblock %}
{% block main-class %}main-bg main-padding text-center{% endblock %}
{% block main-size %}col-sm-8 col-sm-offset-2 col-md-6 col-md-offset-3{% endblock %}
{% block main %}
<form action="{{ confirm_url }}" method="post">
<p>
{{ text }}
</p>
<button type="submit" class="btn btn-primary">{% trans "Confirmer" %}</button>
{% csrf_token %}
</form>
{% endblock %}

View file

@ -1,7 +0,0 @@
{% extends 'kfet/base.html' %}
{% block extra_head %}
<script type="text/javascript">
close();
</script>
{% endblock %}

View file

@ -4,6 +4,8 @@
<li class="{% if not href %}navbar-text{% endif %} {% if request.path == href %}active{% endif %} {{ class }}">
{% if href %}
<a href="{{ href }}" title="{{ text }}">
{% else %}
<span title="{{ text }}">
{% endif %}<!--
{% if glyphicon %}
--><span class="glyphicon glyphicon-{{ glyphicon }}"></span><!--
@ -14,5 +16,7 @@
-->
{% if href %}
</a>
{% else %}
</span>
{% endif %}
</li>

View file

@ -1,56 +0,0 @@
# -*- coding: utf-8 -*-
from django.test import TestCase
from django.contrib.auth.models import User, Group
from kfet.forms import UserGroupForm
class UserGroupFormTests(TestCase):
"""Test suite for UserGroupForm."""
def setUp(self):
# create user
self.user = User.objects.create(username="foo", password="foo")
# create some K-Fêt groups
prefix_name = "K-Fêt "
names = ["Group 1", "Group 2", "Group 3"]
self.kfet_groups = [
Group.objects.create(name=prefix_name+name)
for name in names
]
# create a non-K-Fêt group
self.other_group = Group.objects.create(name="Other group")
def test_choices(self):
"""Only K-Fêt groups are selectable."""
form = UserGroupForm(instance=self.user)
groups_field = form.fields['groups']
self.assertQuerysetEqual(
groups_field.queryset,
[repr(g) for g in self.kfet_groups],
ordered=False,
)
def test_keep_others(self):
"""User stays in its non-K-Fêt groups."""
user = self.user
# add user to a non-K-Fêt group
user.groups.add(self.other_group)
# add user to some K-Fêt groups through UserGroupForm
data = {
'groups': [group.pk for group in self.kfet_groups],
}
form = UserGroupForm(data, instance=user)
form.is_valid()
form.save()
self.assertQuerysetEqual(
user.groups.all(),
[repr(g) for g in [self.other_group] + self.kfet_groups],
ordered=False,
)

25
kfet/tests/test_models.py Normal file
View file

@ -0,0 +1,25 @@
from django.contrib.auth import get_user_model
from django.test import TestCase
from kfet.models import Account
User = get_user_model()
class AccountTests(TestCase):
def setUp(self):
self.account = Account(trigramme='000')
self.account.save({'username': 'user'})
def test_password(self):
self.account.change_pwd('anna')
self.account.save()
self.assertEqual(Account.objects.get_by_password('anna'), self.account)
with self.assertRaises(Account.DoesNotExist):
Account.objects.get_by_password(None)
with self.assertRaises(Account.DoesNotExist):
Account.objects.get_by_password('bernard')

View file

@ -8,8 +8,8 @@ from kfet.decorators import teamkfet_required
urlpatterns = [
url(r'^login/genericteam$', views.login_genericteam,
name='kfet.login.genericteam'),
url(r'^login/generic$', views.login_generic,
name='kfet.login.generic'),
url(r'^history$', views.history,
name='kfet.history'),

View file

@ -12,34 +12,30 @@ from django.views.generic.edit import CreateView, UpdateView
from django.core.urlresolvers import reverse, reverse_lazy
from django.contrib import messages
from django.contrib.messages.views import SuccessMessageMixin
from django.contrib.auth import authenticate, login
from django.contrib.auth.decorators import login_required, permission_required
from django.contrib.auth.models import User, Permission, Group
from django.contrib.auth.decorators import login_required
from django.contrib.auth.models import User, Permission
from django.http import JsonResponse, Http404
from django.forms import formset_factory
from django.db import transaction
from django.db.models import F, Sum, Prefetch, Count
from django.db.models.functions import Coalesce
from django.utils import timezone
from django.utils.crypto import get_random_string
from django.utils.decorators import method_decorator
from django_cas_ng.views import logout as cas_logout_view
from gestioncof.models import CofProfile
from kfet.config import kfet_config
from kfet.decorators import teamkfet_required
from kfet.models import (
Account, Checkout, Article, AccountNegative,
CheckoutStatement, GenericTeamToken, Supplier, SupplierArticle, Inventory,
CheckoutStatement, Supplier, SupplierArticle, Inventory,
InventoryArticle, Order, OrderArticle, Operation, OperationGroup,
TransferGroup, Transfer, ArticleCategory)
from kfet.forms import (
AccountTriForm, AccountBalanceForm, AccountNoTriForm, UserForm, CofForm,
UserRestrictTeamForm, UserGroupForm, AccountForm, CofRestrictForm,
AccountPwdForm, AccountNegativeForm, UserRestrictForm, AccountRestrictForm,
GroupForm, CheckoutForm, CheckoutRestrictForm, CheckoutStatementCreateForm,
CheckoutForm, CheckoutRestrictForm, CheckoutStatementCreateForm,
CheckoutStatementUpdateForm, ArticleForm, ArticleRestrictForm,
KPsulOperationGroupForm, KPsulAccountForm, KPsulCheckoutForm,
KPsulOperationFormSet, AddcostForm, FilterHistoryForm,
@ -54,25 +50,9 @@ import heapq
import statistics
from kfet.statistic import ScaleMixin, last_stats_manifest, tot_ventes, WeekScale
@teamkfet_required
def login_genericteam(request):
# Check si besoin de déconnecter l'utilisateur de CAS
cas_logout = None
if request.user.profile.login_clipper:
# Récupèration de la vue de déconnexion de CAS
# Ici, car request sera modifié après
next_page = request.META.get('HTTP_REFERER', None)
cas_logout = cas_logout_view(request, next_page=next_page)
# Authentification du compte générique
token = GenericTeamToken.objects.create(token=get_random_string(50))
user = authenticate(username="kfet_genericteam", token=token.token)
login(request, user)
messages.success(request, "Connecté en utilisateur partagé")
return cas_logout or render(request, "kfet/login_genericteam.html")
from .auth.views import ( # noqa
account_group, login_generic, AccountGroupCreate, AccountGroupUpdate,
)
def put_cleaned_data_in_dict(dict, form):
@ -505,37 +485,6 @@ def account_update(request, trigramme):
})
@permission_required('kfet.manage_perms')
def account_group(request):
user_pre = Prefetch(
'user_set',
queryset=User.objects.select_related('profile__account_kfet'),
)
groups = (
Group.objects
.filter(name__icontains='K-Fêt')
.prefetch_related('permissions', user_pre)
)
return render(request, 'kfet/account_group.html', {
'groups': groups,
})
class AccountGroupCreate(SuccessMessageMixin, CreateView):
model = Group
template_name = 'kfet/account_group_form.html'
form_class = GroupForm
success_message = 'Nouveau groupe : %(name)s'
success_url = reverse_lazy('kfet.account.group')
class AccountGroupUpdate(SuccessMessageMixin, UpdateView):
queryset = Group.objects.filter(name__icontains='K-Fêt')
template_name = 'kfet/account_group_form.html'
form_class = GroupForm
success_message = 'Groupe modifié : %(name)s'
success_url = reverse_lazy('kfet.account.group')
class AccountNegativeList(ListView):
queryset = (
AccountNegative.objects