diff --git a/cof/settings/common.py b/cof/settings/common.py index 92759d21..0437f5db 100644 --- a/cof/settings/common.py +++ b/cof/settings/common.py @@ -100,7 +100,7 @@ MIDDLEWARE_CLASSES = [ 'django.middleware.csrf.CsrfViewMiddleware', 'django.contrib.auth.middleware.AuthenticationMiddleware', 'django.contrib.auth.middleware.SessionAuthenticationMiddleware', - 'kfet.auth.middleware.KFetAuthenticationMiddleware', + 'kfet.auth.middleware.TemporaryAuthMiddleware', 'django.contrib.messages.middleware.MessageMiddleware', 'django.middleware.clickjacking.XFrameOptionsMiddleware', 'django.middleware.security.SecurityMiddleware', @@ -128,7 +128,7 @@ TEMPLATES = [ 'wagtailmenus.context_processors.wagtailmenus', 'djconfig.context_processors.config', 'gestioncof.shared.context_processor', - 'kfet.auth.context_processors.auth', + 'kfet.auth.context_processors.temporary_auth', 'kfet.context_processors.config', ], }, @@ -191,7 +191,7 @@ CAS_EMAIL_FORMAT = "%s@clipper.ens.fr" AUTHENTICATION_BACKENDS = ( 'django.contrib.auth.backends.ModelBackend', 'gestioncof.shared.COFCASBackend', - 'kfet.auth.backends.GenericTeamBackend', + 'kfet.auth.backends.GenericBackend', ) RECAPTCHA_USE_SSL = True diff --git a/kfet/apps.py b/kfet/apps.py index 3dd2c0e8..4f114c37 100644 --- a/kfet/apps.py +++ b/kfet/apps.py @@ -11,7 +11,6 @@ class KFetConfig(AppConfig): verbose_name = "Application K-Fêt" def ready(self): - import kfet.signals self.register_config() def register_config(self): diff --git a/kfet/auth/apps.py b/kfet/auth/apps.py index 03742843..d91931f5 100644 --- a/kfet/auth/apps.py +++ b/kfet/auth/apps.py @@ -9,5 +9,6 @@ class KFetAuthConfig(AppConfig): verbose_name = _("K-Fêt - Authentification et Autorisation") def ready(self): + from . import signals # noqa from .utils import setup_kfet_generic_user post_migrate.connect(setup_kfet_generic_user, sender=self) diff --git a/kfet/auth/backends.py b/kfet/auth/backends.py index b2f1cb03..c6ad21b2 100644 --- a/kfet/auth/backends.py +++ b/kfet/auth/backends.py @@ -22,32 +22,22 @@ class BaseKFetBackend: return None -class KFetBackend(BaseKFetBackend): - def authenticate(self, request): - password = request.POST.get('KFETPASSWORD', '') - password = request.META.get('HTTP_KFETPASSWORD', password) - if not password: - return None - +class AccountBackend(BaseKFetBackend): + def authenticate(self, request, kfet_password=None): try: - return Account.objects.get_by_password(password).user + return Account.objects.get_by_password(kfet_password).user except Account.DoesNotExist: return None -class GenericTeamBackend(BaseKFetBackend): - def authenticate(self, username=None, token=None): - valid_token = GenericTeamToken.objects.get(token=token) - if username == 'kfet_genericteam' and valid_token: - return get_kfet_generic_user() - return None - - def get_user(self, user_id): +class GenericBackend(BaseKFetBackend): + def authenticate(self, request, kfet_token=None): try: - return ( - User.objects - .select_related('profile__account_kfet') - .get(pk=user_id) - ) - except User.DoesNotExist: - return None + team_token = GenericTeamToken.objects.get(token=kfet_token) + except GenericTeamToken.DoesNotExist: + return + + # No need to keep the token. + team_token.delete() + + return get_kfet_generic_user() diff --git a/kfet/auth/context_processors.py b/kfet/auth/context_processors.py index 07c9537f..7b59b88b 100644 --- a/kfet/auth/context_processors.py +++ b/kfet/auth/context_processors.py @@ -1,7 +1,7 @@ from django.contrib.auth.context_processors import PermWrapper -def auth(request): +def temporary_auth(request): if hasattr(request, 'real_user'): return { 'user': request.real_user, diff --git a/kfet/auth/middleware.py b/kfet/auth/middleware.py index 1a930c3b..748ce4dd 100644 --- a/kfet/auth/middleware.py +++ b/kfet/auth/middleware.py @@ -1,12 +1,13 @@ # -*- coding: utf-8 -*- +from django.contrib.auth import get_user_model -from django.contrib.auth.models import User +from .backends import AccountBackend -from .backends import KFetBackend +User = get_user_model() -class KFetAuthenticationMiddleware(object): - """Authenticate another user for this request if KFetBackend succeeds. +class TemporaryAuthMiddleware: + """Authenticate another user for this request if AccountBackend succeeds. By the way, if a user is authenticated, we refresh its from db to add values from CofProfile and Account of this user. @@ -15,15 +16,23 @@ class KFetAuthenticationMiddleware(object): def process_request(self, request): if request.user.is_authenticated(): # avoid multiple db accesses in views and templates - user_pk = request.user.pk request.user = ( User.objects .select_related('profile__account_kfet') - .get(pk=user_pk) + .get(pk=request.user.pk) ) - kfet_backend = KFetBackend() - temp_request_user = kfet_backend.authenticate(request) + temp_request_user = AccountBackend().authenticate( + request, + kfet_password=self.get_kfet_password(request), + ) + if temp_request_user: request.real_user = request.user request.user = temp_request_user + + def get_kfet_password(self, request): + return ( + request.META.get('HTTP_KFETPASSWORD') or + request.POST.get('KFETPASSWORD') + ) diff --git a/kfet/auth/models.py b/kfet/auth/models.py index 53aef6c9..ecd40091 100644 --- a/kfet/auth/models.py +++ b/kfet/auth/models.py @@ -1,5 +1,17 @@ from django.db import models +from django.utils.crypto import get_random_string + + +class GenericTeamTokenManager(models.Manager): + + def create_token(self): + token = get_random_string(50) + while self.filter(token=token).exists(): + token = get_random_string(50) + return self.create(token=token) class GenericTeamToken(models.Model): token = models.CharField(max_length=50, unique=True) + + objects = GenericTeamTokenManager() diff --git a/kfet/auth/signals.py b/kfet/auth/signals.py new file mode 100644 index 00000000..3d7af18b --- /dev/null +++ b/kfet/auth/signals.py @@ -0,0 +1,40 @@ +from django.contrib import messages +from django.contrib.auth.signals import user_logged_in +from django.core.urlresolvers import reverse +from django.dispatch import receiver +from django.utils.safestring import mark_safe +from django.utils.translation import ugettext as _ + +from .utils import get_kfet_generic_user + + +@receiver(user_logged_in) +def suggest_auth_generic(sender, request, user, **kwargs): + """ + Suggest logged in user to continue as the kfet generic user. + + Message is only added if the following conditions are met: + - the next page (where user is going to be redirected due to successful + authentication) is related to kfet, i.e. 'k-fet' is in its url. + - logged in user is a kfet staff member (except the generic user). + """ + # Filter against the next page. + if not(hasattr(request, 'GET') and 'next' in request.GET): + return + + next_page = request.GET['next'] + generic_url = reverse('kfet.login.generic') + + if not('k-fet' in next_page and not next_page.startswith(generic_url)): + return + + # Filter against the logged in user. + if not(user.has_perm('kfet.is_team') and user != get_kfet_generic_user()): + return + + # Seems legit to add message. + text = _("K-Fêt — Ouvrir une session partagée ?") + messages.info(request, mark_safe( + '{}' + .format(generic_url, text) + )) diff --git a/kfet/auth/templates/kfet/login_genericteam.html b/kfet/auth/templates/kfet/login_genericteam.html deleted file mode 100644 index d2f8eca0..00000000 --- a/kfet/auth/templates/kfet/login_genericteam.html +++ /dev/null @@ -1,7 +0,0 @@ -{% extends 'kfet/base.html' %} - -{% block extra_head %} - -{% endblock %} diff --git a/kfet/auth/tests.py b/kfet/auth/tests.py index 47cc2376..c2f183cd 100644 --- a/kfet/auth/tests.py +++ b/kfet/auth/tests.py @@ -1,15 +1,26 @@ # -*- coding: utf-8 -*- +from unittest import mock -from django.test import TestCase -from django.contrib.auth.models import User, Group +from django.core import signing +from django.core.urlresolvers import reverse +from django.contrib.auth.models import AnonymousUser, Group, Permission, User +from django.test import RequestFactory, TestCase from kfet.forms import UserGroupForm from kfet.models import Account from . import KFET_GENERIC_TRIGRAMME, KFET_GENERIC_USERNAME +from .backends import AccountBackend, GenericBackend +from .middleware import TemporaryAuthMiddleware +from .models import GenericTeamToken from .utils import get_kfet_generic_user +from .views import GenericLoginView +## +# Forms +## + class UserGroupFormTests(TestCase): """Test suite for UserGroupForm.""" @@ -70,3 +81,287 @@ class KFetGenericUserTests(TestCase): self.assertEqual(generic.trigramme, KFET_GENERIC_TRIGRAMME) self.assertEqual(generic.user.username, KFET_GENERIC_USERNAME) self.assertEqual(get_kfet_generic_user(), generic.user) + + +## +# Backends +## + +class AccountBackendTests(TestCase): + + def setUp(self): + self.request = RequestFactory().get('/') + + def test_valid(self): + acc = Account(trigramme='000') + acc.change_pwd('valid') + acc.save({'username': 'user'}) + + auth = AccountBackend().authenticate( + self.request, kfet_password='valid') + + self.assertEqual(auth, acc.user) + + def test_invalid(self): + auth = AccountBackend().authenticate( + self.request, kfet_password='invalid') + self.assertIsNone(auth) + + +class GenericBackendTests(TestCase): + + def setUp(self): + self.request = RequestFactory().get('/') + + def test_valid(self): + token = GenericTeamToken.objects.create_token() + + auth = GenericBackend().authenticate( + self.request, kfet_token=token.token) + + self.assertEqual(auth, get_kfet_generic_user()) + self.assertEqual(GenericTeamToken.objects.all().count(), 0) + + def test_invalid(self): + auth = GenericBackend().authenticate( + self.request, kfet_token='invalid') + self.assertIsNone(auth) + + +## +# Views +## + +class GenericLoginViewTests(TestCase): + + def setUp(self): + patcher_messages = mock.patch('gestioncof.signals.messages') + patcher_messages.start() + self.addCleanup(patcher_messages.stop) + + user_acc = Account(trigramme='000') + user_acc.save({'username': 'user'}) + self.user = user_acc.user + self.user.set_password('user') + self.user.save() + + team_acc = Account(trigramme='100') + team_acc.save({'username': 'team'}) + self.team = team_acc.user + self.team.set_password('team') + self.team.save() + self.team.user_permissions.add( + Permission.objects.get( + content_type__app_label='kfet', codename='is_team'), + ) + + self.url = reverse('kfet.login.generic') + self.generic_user = get_kfet_generic_user() + + def test_url(self): + self.assertEqual(self.url, '/k-fet/login/generic') + + def test_notoken_get(self): + """ + Send confirmation for user to emit POST request, instead of GET. + """ + self.client.login(username='team', password='team') + + r = self.client.get(self.url) + + self.assertEqual(r.status_code, 200) + self.assertTemplateUsed(r, 'kfet/confirm_form.html') + + def test_notoken_post(self): + """ + POST request without token in COOKIES sets a token and redirects to + logout url. + """ + self.client.login(username='team', password='team') + + r = self.client.post(self.url) + + self.assertRedirects( + r, '/logout?next={}'.format(self.url), + fetch_redirect_response=False, + ) + + def test_notoken_not_team(self): + """ + Logged in user must be a team user to initiate login as generic user. + """ + self.client.login(username='user', password='user') + + # With GET. + r = self.client.get(self.url) + self.assertRedirects( + r, '/login?next={}'.format(self.url), + fetch_redirect_response=False, + ) + + # Also with POST. + r = self.client.post(self.url) + self.assertRedirects( + r, '/login?next={}'.format(self.url), + fetch_redirect_response=False, + ) + + def _set_signed_cookie(self, client, key, value): + signed_value = signing.get_cookie_signer(salt=key).sign(value) + client.cookies.load({key: signed_value}) + + def _is_cookie_deleted(self, client, key): + try: + self.assertNotIn(key, client.cookies) + except AssertionError: + try: + cookie = client.cookies[key] + # It also can be emptied. + self.assertEqual(cookie.value, '') + self.assertEqual( + cookie['expires'], 'Thu, 01-Jan-1970 00:00:00 GMT') + self.assertEqual(cookie['max-age'], 0) + except AssertionError: + raise AssertionError("The cookie '%s' still exists." % key) + + def test_withtoken_valid(self): + """ + The kfet generic user is logged in. + """ + token = GenericTeamToken.objects.create(token='valid') + self._set_signed_cookie( + self.client, GenericLoginView.TOKEN_COOKIE_NAME, 'valid') + + r = self.client.get(self.url) + + self.assertRedirects(r, reverse('kfet.kpsul')) + self.assertEqual(r.wsgi_request.user, self.generic_user) + self._is_cookie_deleted( + self.client, GenericLoginView.TOKEN_COOKIE_NAME) + with self.assertRaises(GenericTeamToken.DoesNotExist): + token.refresh_from_db() + + def test_withtoken_invalid(self): + """ + If token is invalid, delete it and try again. + """ + self._set_signed_cookie( + self.client, GenericLoginView.TOKEN_COOKIE_NAME, 'invalid') + + r = self.client.get(self.url) + + self.assertRedirects(r, self.url, fetch_redirect_response=False) + self.assertEqual(r.wsgi_request.user, AnonymousUser()) + self._is_cookie_deleted( + self.client, GenericLoginView.TOKEN_COOKIE_NAME) + + def test_flow_ok(self): + """ + A team user is logged in as the kfet generic user. + """ + self.client.login(username='team', password='team') + next_url = '/k-fet/' + + r = self.client.post( + '{}?next={}'.format(self.url, next_url), follow=True) + + self.assertEqual(r.wsgi_request.user, self.generic_user) + self.assertEqual(r.wsgi_request.path, '/k-fet/') + + +## +# Temporary authentication +# +# Includes: +# - TemporaryAuthMiddleware +# - temporary_auth context processor +## + +class TemporaryAuthTests(TestCase): + + def setUp(self): + patcher_messages = mock.patch('gestioncof.signals.messages') + patcher_messages.start() + self.addCleanup(patcher_messages.stop) + + self.factory = RequestFactory() + + user1_acc = Account(trigramme='000') + user1_acc.change_pwd('kfet_user1') + user1_acc.save({'username': 'user1'}) + self.user1 = user1_acc.user + self.user1.set_password('user1') + self.user1.save() + + user2_acc = Account(trigramme='100') + user2_acc.change_pwd('kfet_user2') + user2_acc.save({'username': 'user2'}) + self.user2 = user2_acc.user + self.user2.set_password('user2') + self.user2.save() + + self.perm = Permission.objects.get( + content_type__app_label='kfet', codename='is_team') + self.user2.user_permissions.add(self.perm) + + def test_middleware_header(self): + """ + A user can be authenticated if ``HTTP_KFETPASSWORD`` header of a + request contains a valid kfet password. + """ + request = self.factory.get('/', HTTP_KFETPASSWORD='kfet_user2') + request.user = self.user1 + + TemporaryAuthMiddleware().process_request(request) + + self.assertEqual(request.user, self.user2) + self.assertEqual(request.real_user, self.user1) + + def test_middleware_post(self): + """ + A user can be authenticated if ``KFETPASSWORD`` of POST data contains + a valid kfet password. + """ + request = self.factory.post('/', {'KFETPASSWORD': 'kfet_user2'}) + request.user = self.user1 + + TemporaryAuthMiddleware().process_request(request) + + self.assertEqual(request.user, self.user2) + self.assertEqual(request.real_user, self.user1) + + def test_middleware_invalid(self): + """ + The given password must be a password of an Account. + """ + request = self.factory.post('/', {'KFETPASSWORD': 'invalid'}) + request.user = self.user1 + + TemporaryAuthMiddleware().process_request(request) + + self.assertEqual(request.user, self.user1) + self.assertFalse(hasattr(request, 'real_user')) + + def test_context_processor(self): + """ + Context variables give the real authenticated user and his permissions. + """ + self.client.login(username='user1', password='user1') + + r = self.client.get('/k-fet/accounts/', HTTP_KFETPASSWORD='kfet_user2') + + self.assertEqual(r.context['user'], self.user1) + self.assertNotIn('kfet.is_team', r.context['perms']) + + def test_auth_not_persistent(self): + """ + The authentication is temporary, i.e. for one request. + """ + self.client.login(username='user1', password='user1') + + r1 = self.client.get( + '/k-fet/accounts/', HTTP_KFETPASSWORD='kfet_user2') + self.assertEqual(r1.wsgi_request.user, self.user2) + + r2 = self.client.get('/k-fet/accounts/') + self.assertEqual(r2.wsgi_request.user, self.user1) diff --git a/kfet/auth/views.py b/kfet/auth/views.py index ce44b007..7b9f4099 100644 --- a/kfet/auth/views.py +++ b/kfet/auth/views.py @@ -3,38 +3,105 @@ from django.contrib.messages.views import SuccessMessageMixin from django.contrib.auth import authenticate, login from django.contrib.auth.decorators import permission_required from django.contrib.auth.models import Group, User -from django.core.urlresolvers import reverse_lazy +from django.contrib.auth.views import redirect_to_login +from django.core.urlresolvers import reverse, reverse_lazy from django.db.models import Prefetch -from django.shortcuts import render -from django.utils.crypto import get_random_string +from django.http import QueryDict +from django.shortcuts import redirect, render +from django.utils.decorators import method_decorator +from django.utils.translation import ugettext_lazy as _ +from django.views.generic import View +from django.views.decorators.http import require_http_methods from django.views.generic.edit import CreateView, UpdateView -from django_cas_ng.views import logout as cas_logout_view - -from kfet.decorators import teamkfet_required - from .forms import GroupForm from .models import GenericTeamToken -@teamkfet_required -def login_genericteam(request): - # Check si besoin de déconnecter l'utilisateur de CAS - cas_logout = None - if request.user.profile.login_clipper: - # Récupèration de la vue de déconnexion de CAS - # Ici, car request sera modifié après - next_page = request.META.get('HTTP_REFERER', None) - cas_logout = cas_logout_view(request, next_page=next_page) +class GenericLoginView(View): + """ + View to authenticate as kfet generic user. - # Authentification du compte générique - token = GenericTeamToken.objects.create(token=get_random_string(50)) - user = authenticate(username="kfet_genericteam", token=token.token) - login(request, user) + It is a 2-step view. First, issue a token if user is a team member and send + him to the logout view (for proper disconnect) with callback url to here. + Then authenticate the token to log in as the kfet generic user. - messages.success(request, "Connecté en utilisateur partagé") + Token is stored in COOKIES to avoid share it with the authentication + provider, which can be external. Session is unusable as it will be cleared + on logout. + """ + TOKEN_COOKIE_NAME = 'kfettoken' - return cas_logout or render(request, "kfet/login_genericteam.html") + @method_decorator(require_http_methods(['GET', 'POST'])) + def dispatch(self, request, *args, **kwargs): + token = request.get_signed_cookie(self.TOKEN_COOKIE_NAME, None) + if not token: + if not request.user.has_perm('kfet.is_team'): + return redirect_to_login(request.get_full_path()) + + if request.method == 'POST': + # Step 1: set token and logout user. + return self.prepare_auth() + else: + # GET request should not change server/client states. Send a + # confirmation template to emit a POST request. + return render(request, 'kfet/confirm_form.html', { + 'title': _("Ouvrir une session partagée"), + 'text': _( + "Êtes-vous sûr·e de vouloir ouvrir une session " + "partagée ?" + ), + }) + else: + # Step 2: validate token. + return self.validate_auth(token) + + def prepare_auth(self): + # Issue token. + token = GenericTeamToken.objects.create_token() + + # Prepare callback of logout. + here_url = reverse(login_generic) + if 'next' in self.request.GET: + # Keep given next page. + here_qd = QueryDict(mutable=True) + here_qd['next'] = self.request.GET['next'] + here_url += '?{}'.format(here_qd.urlencode()) + + logout_url = reverse('cof-logout') + logout_qd = QueryDict(mutable=True) + logout_qd['next'] = here_url + logout_url += '?{}'.format(logout_qd.urlencode(safe='/')) + + resp = redirect(logout_url) + resp.set_signed_cookie( + self.TOKEN_COOKIE_NAME, token.token, httponly=True) + return resp + + def validate_auth(self, token): + # Authenticate with GenericBackend. + user = authenticate(request=self.request, kfet_token=token) + + if user: + # Log in generic user. + login(self.request, user) + messages.success(self.request, _( + "K-Fêt — Ouverture d'une session partagée." + )) + resp = redirect(self.get_next_url()) + else: + # Try again. + resp = redirect(self.request.get_full_path()) + + # Prevents blocking due to an invalid COOKIE. + resp.delete_cookie(self.TOKEN_COOKIE_NAME) + return resp + + def get_next_url(self): + return self.request.GET.get('next', reverse('kfet.kpsul')) + + +login_generic = GenericLoginView.as_view() @permission_required('kfet.manage_perms') diff --git a/kfet/signals.py b/kfet/signals.py deleted file mode 100644 index c677ac9c..00000000 --- a/kfet/signals.py +++ /dev/null @@ -1,21 +0,0 @@ -# -*- coding: utf-8 -*- - -from django.contrib import messages -from django.contrib.auth.signals import user_logged_in -from django.core.urlresolvers import reverse -from django.dispatch import receiver -from django.utils.safestring import mark_safe - - -@receiver(user_logged_in) -def messages_on_login(sender, request, user, **kwargs): - if (not user.username == 'kfet_genericteam' and - user.has_perm('kfet.is_team') and - hasattr(request, 'GET') and - 'k-fet' in request.GET.get('next', '')): - messages.info(request, mark_safe( - '' - ' Connexion en utilisateur partagé ?' - '' - .format(reverse('kfet.login.genericteam')) - )) diff --git a/kfet/static/kfet/js/kfet.js b/kfet/static/kfet/js/kfet.js index b34f2005..75b80b04 100644 --- a/kfet/static/kfet/js/kfet.js +++ b/kfet/static/kfet/js/kfet.js @@ -1,22 +1,33 @@ -$(document).ready(function() { - if (typeof Cookies !== 'undefined') { - // Retrieving csrf token - csrftoken = Cookies.get('csrftoken'); - // Appending csrf token to ajax post requests - function csrfSafeMethod(method) { - // these HTTP methods do not require CSRF protection - return (/^(GET|HEAD|OPTIONS|TRACE)$/.test(method)); +/** + * CSRF Token + */ + +var csrftoken = ''; +if (typeof Cookies !== 'undefined') + csrftoken = Cookies.get('csrftoken'); + +// Add CSRF token in header of AJAX requests. + +function csrfSafeMethod(method) { + // these HTTP methods do not require CSRF protection + return (/^(GET|HEAD|OPTIONS|TRACE)$/.test(method)); +} + +$.ajaxSetup({ + beforeSend: function(xhr, settings) { + if (!csrfSafeMethod(settings.type) && !this.crossDomain) { + xhr.setRequestHeader("X-CSRFToken", csrftoken); } - $.ajaxSetup({ - beforeSend: function(xhr, settings) { - if (!csrfSafeMethod(settings.type) && !this.crossDomain) { - xhr.setRequestHeader("X-CSRFToken", csrftoken); - } - } - }); } }); +function add_csrf_form($form) { + $form.append( + $('', {'name': 'csrfmiddlewaretoken', 'value': csrftoken}) + ); +} + + /* * Generic Websocket class and k-psul ws instanciation */ @@ -199,3 +210,28 @@ jconfirm.defaults = { confirmButton: '', cancelButton: '' }; + + +/** + * Create form node, given an url used as 'action', with csrftoken set. + */ +function create_form(url) { + let $form = $('
+ +{% endblock %} diff --git a/kfet/templates/kfet/nav_item.html b/kfet/templates/kfet/nav_item.html index 8bc311b8..b5981266 100644 --- a/kfet/templates/kfet/nav_item.html +++ b/kfet/templates/kfet/nav_item.html @@ -4,6 +4,8 @@