485 lines
15 KiB
Python
485 lines
15 KiB
Python
# -*- coding: utf-8 -*-
|
|
from unittest import mock
|
|
|
|
from django.contrib.auth.models import (
|
|
AnonymousUser, Group as DjangoGroup, User,
|
|
)
|
|
from django.contrib.contenttypes.models import ContentType
|
|
from django.core import signing
|
|
from django.core.urlresolvers import reverse
|
|
from django.test import RequestFactory, TestCase
|
|
|
|
from kfet.models import Account
|
|
|
|
from . import KFET_GENERIC_TRIGRAMME, KFET_GENERIC_USERNAME
|
|
from .backends import AccountBackend, GenericBackend
|
|
from .fields import GroupsField, CorePermissionsField
|
|
from .forms import GroupForm, UserGroupForm
|
|
from .middleware import TemporaryAuthMiddleware
|
|
from .models import GenericTeamToken, Group, Permission
|
|
from .utils import get_kfet_generic_user
|
|
from .views import GenericLoginView
|
|
|
|
|
|
##
|
|
# Forms and form fields
|
|
##
|
|
|
|
|
|
class CorePermissionsFieldTests(TestCase):
|
|
|
|
def setUp(self):
|
|
kf_ct = ContentType.objects.get_for_model(GenericTeamToken)
|
|
self.kf_perm1 = Permission.objects.create(
|
|
content_type=kf_ct, codename='code1')
|
|
self.kf_perm2 = Permission.objects.create(
|
|
content_type=kf_ct, codename='code2')
|
|
self.kf_perm3 = Permission.objects.create(
|
|
content_type=kf_ct, codename='code3')
|
|
|
|
ot_ct = ContentType.objects.get_for_model(Permission)
|
|
self.ot_perm = Permission.objects.create(
|
|
content_type=ot_ct, codename='code')
|
|
|
|
def test_choices(self):
|
|
"""
|
|
Only K-Fêt permissions are selectable.
|
|
"""
|
|
field = CorePermissionsField()
|
|
for p in [self.kf_perm1, self.kf_perm2, self.kf_perm3]:
|
|
self.assertIn(p, field.queryset)
|
|
self.assertNotIn(self.ot_perm, field.queryset)
|
|
|
|
|
|
class GroupsFieldTests(TestCase):
|
|
|
|
def setUp(self):
|
|
self.kf_group1 = Group.objects.create(name='KF Group1')
|
|
self.kf_group2 = Group.objects.create(name='KF Group2')
|
|
self.kf_group3 = Group.objects.create(name='KF Group3')
|
|
self.ot_group = DjangoGroup.objects.create(name='OT Group')
|
|
|
|
def test_choices(self):
|
|
"""
|
|
Only K-Fêt groups are selectable.
|
|
"""
|
|
field = GroupsField()
|
|
self.assertQuerysetEqual(
|
|
field.queryset,
|
|
map(repr, [self.kf_group1, self.kf_group2, self.kf_group3]),
|
|
ordered=False,
|
|
)
|
|
|
|
|
|
class GroupFormTests(TestCase):
|
|
|
|
def setUp(self):
|
|
self.user = User.objects.create(username='user')
|
|
self.kf_group = Group.objects.create(name='A Group')
|
|
|
|
self.kf_perm1 = Permission.kfetcore.get(codename='is_team')
|
|
self.kf_perm2 = Permission.kfetcore.get(codename='add_checkout')
|
|
|
|
ot_ct = ContentType.objects.get_for_model(Permission)
|
|
self.ot_perm = Permission.objects.create(
|
|
content_type=ot_ct, codename='cool')
|
|
|
|
def test_creation(self):
|
|
data = {
|
|
'name': 'Another Group',
|
|
'permissions': [self.kf_perm1.pk],
|
|
}
|
|
form = GroupForm(data)
|
|
instance = form.save()
|
|
|
|
self.assertEqual(instance.name, 'Another Group')
|
|
self.assertQuerysetEqual(
|
|
instance.permissions.all(), map(repr, [self.kf_perm1]))
|
|
|
|
def test_keep_others(self):
|
|
"""
|
|
Non-kfet permissions of Group are kept when the form is submitted.
|
|
Regression test for #168.
|
|
"""
|
|
self.kf_group.permissions.add(self.ot_perm)
|
|
|
|
selected = [self.kf_perm1, self.kf_perm2]
|
|
|
|
data = {
|
|
'name': 'A Group',
|
|
'permissions': [str(p.pk) for p in selected],
|
|
}
|
|
|
|
form = GroupForm(data, instance=self.kf_group)
|
|
form.save()
|
|
|
|
self.assertQuerysetEqual(
|
|
self.kf_group.permissions.all(),
|
|
map(repr, [self.ot_perm] + selected),
|
|
ordered=False,
|
|
)
|
|
|
|
|
|
class UserGroupFormTests(TestCase):
|
|
|
|
def setUp(self):
|
|
# create user
|
|
self.user = User.objects.create(username="foo", password="foo")
|
|
|
|
# create some K-Fêt groups
|
|
self.kf_group1 = Group.objects.create(name='KF Group1')
|
|
self.kf_group2 = Group.objects.create(name='KF Group2')
|
|
self.kf_group3 = Group.objects.create(name='KF Group3')
|
|
|
|
# create a non-K-Fêt group
|
|
self.ot_group = DjangoGroup.objects.create(name="OT Group")
|
|
|
|
def test_keep_others(self):
|
|
"""
|
|
User stays in its non-K-Fêt groups.
|
|
Regression test for #161.
|
|
"""
|
|
# add user to a non-K-Fêt group
|
|
self.user.groups.add(self.ot_group)
|
|
|
|
# add user to some K-Fêt groups through UserGroupForm
|
|
selected = [self.kf_group1, self.kf_group2]
|
|
data = {'groups': [str(g.pk) for g in selected]}
|
|
form = UserGroupForm(data, instance=self.user)
|
|
form.save()
|
|
|
|
transform = lambda g: g.pk
|
|
self.assertQuerysetEqual(
|
|
self.user.groups.all(),
|
|
map(transform, [self.ot_group] + selected),
|
|
ordered=False, transform=transform,
|
|
)
|
|
|
|
|
|
##
|
|
# Models
|
|
##
|
|
|
|
class PermissionTests(TestCase):
|
|
|
|
def test_manager_kfet(self):
|
|
"""
|
|
'kfet' manager only returns K-Fêt permissions.
|
|
"""
|
|
kf_ct = ContentType.objects.get_for_model(GenericTeamToken)
|
|
kf_perm1 = Permission.objects.create(
|
|
content_type=kf_ct, codename='code1')
|
|
kf_perm2 = Permission.objects.create(
|
|
content_type=kf_ct, codename='code2')
|
|
kf_perm3 = Permission.objects.create(
|
|
content_type=kf_ct, codename='code3')
|
|
|
|
self.assertEqual(Permission.kfetcore.get(codename='code1'), kf_perm1)
|
|
self.assertEqual(Permission.kfetcore.get(codename='code2'), kf_perm2)
|
|
self.assertEqual(Permission.kfetcore.get(codename='code3'), kf_perm3)
|
|
|
|
ot_ct = ContentType.objects.get_for_model(Permission)
|
|
Permission.objects.create(content_type=ot_ct, codename='code')
|
|
|
|
with self.assertRaises(Permission.DoesNotExist):
|
|
Permission.kfetcore.get(codename='code')
|
|
|
|
|
|
##
|
|
# K-Fêt generic user object
|
|
##
|
|
|
|
class KFetGenericUserTests(TestCase):
|
|
|
|
def test_exists(self):
|
|
"""
|
|
The account is set up when app is ready, so it should exist.
|
|
"""
|
|
generic = Account.objects.get_generic()
|
|
self.assertEqual(generic.trigramme, KFET_GENERIC_TRIGRAMME)
|
|
self.assertEqual(generic.user.username, KFET_GENERIC_USERNAME)
|
|
self.assertEqual(get_kfet_generic_user(), generic.user)
|
|
|
|
|
|
##
|
|
# Backends
|
|
##
|
|
|
|
class AccountBackendTests(TestCase):
|
|
|
|
def setUp(self):
|
|
self.request = RequestFactory().get('/')
|
|
|
|
def test_valid(self):
|
|
acc = Account(trigramme='000')
|
|
acc.change_pwd('valid')
|
|
acc.save({'username': 'user'})
|
|
|
|
auth = AccountBackend().authenticate(
|
|
self.request, kfet_password='valid')
|
|
|
|
self.assertEqual(auth, acc.user)
|
|
|
|
def test_invalid(self):
|
|
auth = AccountBackend().authenticate(
|
|
self.request, kfet_password='invalid')
|
|
self.assertIsNone(auth)
|
|
|
|
|
|
class GenericBackendTests(TestCase):
|
|
|
|
def setUp(self):
|
|
self.request = RequestFactory().get('/')
|
|
|
|
def test_valid(self):
|
|
token = GenericTeamToken.objects.create_token()
|
|
|
|
auth = GenericBackend().authenticate(
|
|
self.request, kfet_token=token.token)
|
|
|
|
self.assertEqual(auth, get_kfet_generic_user())
|
|
self.assertEqual(GenericTeamToken.objects.all().count(), 0)
|
|
|
|
def test_invalid(self):
|
|
auth = GenericBackend().authenticate(
|
|
self.request, kfet_token='invalid')
|
|
self.assertIsNone(auth)
|
|
|
|
|
|
##
|
|
# Views
|
|
##
|
|
|
|
class GenericLoginViewTests(TestCase):
|
|
|
|
def setUp(self):
|
|
patcher_messages = mock.patch('gestioncof.signals.messages')
|
|
patcher_messages.start()
|
|
self.addCleanup(patcher_messages.stop)
|
|
|
|
user_acc = Account(trigramme='000')
|
|
user_acc.save({'username': 'user'})
|
|
self.user = user_acc.user
|
|
self.user.set_password('user')
|
|
self.user.save()
|
|
|
|
team_acc = Account(trigramme='100')
|
|
team_acc.save({'username': 'team'})
|
|
self.team = team_acc.user
|
|
self.team.set_password('team')
|
|
self.team.save()
|
|
self.team.user_permissions.add(
|
|
Permission.objects.get(
|
|
content_type__app_label='kfet', codename='is_team'),
|
|
)
|
|
|
|
self.url = reverse('kfet.login.generic')
|
|
self.generic_user = get_kfet_generic_user()
|
|
|
|
def test_url(self):
|
|
self.assertEqual(self.url, '/k-fet/login/generic')
|
|
|
|
def test_notoken_get(self):
|
|
"""
|
|
Send confirmation for user to emit POST request, instead of GET.
|
|
"""
|
|
self.client.login(username='team', password='team')
|
|
|
|
r = self.client.get(self.url)
|
|
|
|
self.assertEqual(r.status_code, 200)
|
|
self.assertTemplateUsed(r, 'kfet/confirm_form.html')
|
|
|
|
def test_notoken_post(self):
|
|
"""
|
|
POST request without token in COOKIES sets a token and redirects to
|
|
logout url.
|
|
"""
|
|
self.client.login(username='team', password='team')
|
|
|
|
r = self.client.post(self.url)
|
|
|
|
self.assertRedirects(
|
|
r, '/logout?next={}'.format(self.url),
|
|
fetch_redirect_response=False,
|
|
)
|
|
|
|
def test_notoken_not_team(self):
|
|
"""
|
|
Logged in user must be a team user to initiate login as generic user.
|
|
"""
|
|
self.client.login(username='user', password='user')
|
|
|
|
# With GET.
|
|
r = self.client.get(self.url)
|
|
self.assertRedirects(
|
|
r, '/login?next={}'.format(self.url),
|
|
fetch_redirect_response=False,
|
|
)
|
|
|
|
# Also with POST.
|
|
r = self.client.post(self.url)
|
|
self.assertRedirects(
|
|
r, '/login?next={}'.format(self.url),
|
|
fetch_redirect_response=False,
|
|
)
|
|
|
|
def _set_signed_cookie(self, client, key, value):
|
|
signed_value = signing.get_cookie_signer(salt=key).sign(value)
|
|
client.cookies.load({key: signed_value})
|
|
|
|
def _is_cookie_deleted(self, client, key):
|
|
try:
|
|
self.assertNotIn(key, client.cookies)
|
|
except AssertionError:
|
|
try:
|
|
cookie = client.cookies[key]
|
|
# It also can be emptied.
|
|
self.assertEqual(cookie.value, '')
|
|
self.assertEqual(
|
|
cookie['expires'], 'Thu, 01-Jan-1970 00:00:00 GMT')
|
|
self.assertEqual(cookie['max-age'], 0)
|
|
except AssertionError:
|
|
raise AssertionError("The cookie '%s' still exists." % key)
|
|
|
|
def test_withtoken_valid(self):
|
|
"""
|
|
The kfet generic user is logged in.
|
|
"""
|
|
token = GenericTeamToken.objects.create(token='valid')
|
|
self._set_signed_cookie(
|
|
self.client, GenericLoginView.TOKEN_COOKIE_NAME, 'valid')
|
|
|
|
r = self.client.get(self.url)
|
|
|
|
self.assertRedirects(r, reverse('kfet.kpsul'))
|
|
self.assertEqual(r.wsgi_request.user, self.generic_user)
|
|
self._is_cookie_deleted(
|
|
self.client, GenericLoginView.TOKEN_COOKIE_NAME)
|
|
with self.assertRaises(GenericTeamToken.DoesNotExist):
|
|
token.refresh_from_db()
|
|
|
|
def test_withtoken_invalid(self):
|
|
"""
|
|
If token is invalid, delete it and try again.
|
|
"""
|
|
self._set_signed_cookie(
|
|
self.client, GenericLoginView.TOKEN_COOKIE_NAME, 'invalid')
|
|
|
|
r = self.client.get(self.url)
|
|
|
|
self.assertRedirects(r, self.url, fetch_redirect_response=False)
|
|
self.assertEqual(r.wsgi_request.user, AnonymousUser())
|
|
self._is_cookie_deleted(
|
|
self.client, GenericLoginView.TOKEN_COOKIE_NAME)
|
|
|
|
def test_flow_ok(self):
|
|
"""
|
|
A team user is logged in as the kfet generic user.
|
|
"""
|
|
self.client.login(username='team', password='team')
|
|
next_url = '/k-fet/'
|
|
|
|
r = self.client.post(
|
|
'{}?next={}'.format(self.url, next_url), follow=True)
|
|
|
|
self.assertEqual(r.wsgi_request.user, self.generic_user)
|
|
self.assertEqual(r.wsgi_request.path, '/k-fet/')
|
|
|
|
|
|
##
|
|
# Temporary authentication
|
|
#
|
|
# Includes:
|
|
# - TemporaryAuthMiddleware
|
|
# - temporary_auth context processor
|
|
##
|
|
|
|
class TemporaryAuthTests(TestCase):
|
|
|
|
def setUp(self):
|
|
patcher_messages = mock.patch('gestioncof.signals.messages')
|
|
patcher_messages.start()
|
|
self.addCleanup(patcher_messages.stop)
|
|
|
|
self.factory = RequestFactory()
|
|
|
|
user1_acc = Account(trigramme='000')
|
|
user1_acc.change_pwd('kfet_user1')
|
|
user1_acc.save({'username': 'user1'})
|
|
self.user1 = user1_acc.user
|
|
self.user1.set_password('user1')
|
|
self.user1.save()
|
|
|
|
user2_acc = Account(trigramme='100')
|
|
user2_acc.change_pwd('kfet_user2')
|
|
user2_acc.save({'username': 'user2'})
|
|
self.user2 = user2_acc.user
|
|
self.user2.set_password('user2')
|
|
self.user2.save()
|
|
|
|
self.perm = Permission.objects.get(
|
|
content_type__app_label='kfet', codename='is_team')
|
|
self.user2.user_permissions.add(self.perm)
|
|
|
|
def test_middleware_header(self):
|
|
"""
|
|
A user can be authenticated if ``HTTP_KFETPASSWORD`` header of a
|
|
request contains a valid kfet password.
|
|
"""
|
|
request = self.factory.get('/', HTTP_KFETPASSWORD='kfet_user2')
|
|
request.user = self.user1
|
|
|
|
TemporaryAuthMiddleware().process_request(request)
|
|
|
|
self.assertEqual(request.user, self.user2)
|
|
self.assertEqual(request.real_user, self.user1)
|
|
|
|
def test_middleware_post(self):
|
|
"""
|
|
A user can be authenticated if ``KFETPASSWORD`` of POST data contains
|
|
a valid kfet password.
|
|
"""
|
|
request = self.factory.post('/', {'KFETPASSWORD': 'kfet_user2'})
|
|
request.user = self.user1
|
|
|
|
TemporaryAuthMiddleware().process_request(request)
|
|
|
|
self.assertEqual(request.user, self.user2)
|
|
self.assertEqual(request.real_user, self.user1)
|
|
|
|
def test_middleware_invalid(self):
|
|
"""
|
|
The given password must be a password of an Account.
|
|
"""
|
|
request = self.factory.post('/', {'KFETPASSWORD': 'invalid'})
|
|
request.user = self.user1
|
|
|
|
TemporaryAuthMiddleware().process_request(request)
|
|
|
|
self.assertEqual(request.user, self.user1)
|
|
self.assertFalse(hasattr(request, 'real_user'))
|
|
|
|
def test_context_processor(self):
|
|
"""
|
|
Context variables give the real authenticated user and his permissions.
|
|
"""
|
|
self.client.login(username='user1', password='user1')
|
|
|
|
r = self.client.get('/k-fet/accounts/', HTTP_KFETPASSWORD='kfet_user2')
|
|
|
|
self.assertEqual(r.context['user'], self.user1)
|
|
self.assertNotIn('kfet.is_team', r.context['perms'])
|
|
|
|
def test_auth_not_persistent(self):
|
|
"""
|
|
The authentication is temporary, i.e. for one request.
|
|
"""
|
|
self.client.login(username='user1', password='user1')
|
|
|
|
r1 = self.client.get(
|
|
'/k-fet/accounts/', HTTP_KFETPASSWORD='kfet_user2')
|
|
self.assertEqual(r1.wsgi_request.user, self.user2)
|
|
|
|
r2 = self.client.get('/k-fet/accounts/')
|
|
self.assertEqual(r2.wsgi_request.user, self.user1)
|