kpsul/kfet/open/tests.py

384 lines
14 KiB
Python

import random
from datetime import timedelta
from unittest import mock
from asgiref.sync import async_to_sync
from channels.auth import AuthMiddlewareStack
from channels.consumer import get_channel_layer
from channels.testing import WebsocketCommunicator
from django.contrib.auth.models import AnonymousUser, Permission, User
from django.test import Client, TestCase
from django.utils import timezone
from . import OpenKfet
from .consumers import OpenKfetConsumer
def ws_communicator(cls, path: str, headers=[]):
return WebsocketCommunicator(AuthMiddlewareStack(cls.as_asgi()), path, headers)
class OpenKfetTest(TestCase):
"""OpenKfet object unit-tests suite."""
def setUp(self):
self.kfet_open = OpenKfet(
cache_prefix="test_kfetopen_%s" % random.randrange(2**20)
)
self.addCleanup(self.kfet_open.clear_cache)
def test_defaults(self):
"""Default values."""
self.assertFalse(self.kfet_open.raw_open)
self.assertIsNone(self.kfet_open.last_update)
self.assertFalse(self.kfet_open.force_close)
self.assertFalse(self.kfet_open.is_open)
def test_raw_open(self):
"""Get and set raw_open; last_update is renewed."""
for raw_open in [True, False]:
prev_update = self.kfet_open.last_update
self.kfet_open.raw_open = raw_open
self.assertEqual(raw_open, self.kfet_open.raw_open)
self.assertNotEqual(prev_update, self.kfet_open.last_update)
def test_force_close(self):
"""Get and set force_close."""
for force_close in [True, False]:
self.kfet_open.force_close = force_close
self.assertEqual(force_close, self.kfet_open.force_close)
def test_is_open(self):
"""If force_close is disabled, is_open is raw_open."""
self.kfet_open.force_close = False
for raw_open in [True, False]:
self.kfet_open.raw_open = raw_open
self.assertEqual(raw_open, self.kfet_open.is_open)
def test_is_open_force_close(self):
"""If force_close is enabled, is_open is False."""
self.kfet_open.force_close = True
for raw_open in [True, False]:
self.kfet_open.raw_open = raw_open
self.assertFalse(self.kfet_open.is_open)
def test_status(self):
# (raw_open, force_close, expected status, expected admin)
cases = [
(False, False, OpenKfet.CLOSED, OpenKfet.CLOSED),
(False, True, OpenKfet.CLOSED, OpenKfet.CLOSED),
(True, False, OpenKfet.OPENED, OpenKfet.OPENED),
(True, True, OpenKfet.CLOSED, OpenKfet.FAKE_CLOSED),
]
for raw_open, force_close, exp_stat, exp_adm_stat in cases:
self.kfet_open.raw_open = raw_open
self.kfet_open.force_close = force_close
self.assertEqual(exp_stat, self.kfet_open.status())
self.assertEqual(exp_adm_stat, self.kfet_open.admin_status())
def test_status_unknown(self):
self.kfet_open.raw_open = True
self.kfet_open._last_update = timezone.now() - timedelta(days=30)
self.assertEqual(OpenKfet.UNKNOWN, self.kfet_open.status())
def test_export_user(self):
"""Export is limited for an anonymous user."""
export = self.kfet_open.export(AnonymousUser())
self.assertSetEqual(set(["status", "type"]), set(export))
def test_export_team(self):
"""Export all values for a team member."""
user = User.objects.create_user("team", "", "team")
is_team = Permission.objects.get(
codename="is_team", content_type__app_label="kfet"
)
user.user_permissions.add(is_team)
export = self.kfet_open.export(user)
self.assertSetEqual(
set(["status", "admin_status", "force_close", "type"]), set(export)
)
async def test_send_ws(self):
channel_layer = get_channel_layer()
base_channel = await channel_layer.new_channel()
team_channel = await channel_layer.new_channel()
await channel_layer.group_add("kfet.open.base", base_channel)
await channel_layer.group_add("kfet.open.team", team_channel)
await self.kfet_open.send_ws()
base = await channel_layer.receive(base_channel)
self.assertSetEqual(set(["status", "type"]), set(base))
team = await channel_layer.receive(team_channel)
self.assertSetEqual(
set(["status", "admin_status", "force_close", "type"]), set(team)
)
class OpenKfetViewsTest(TestCase):
"""OpenKfet views unit-tests suite."""
def setUp(self):
# Need this (and here) because of '<client>.login' in setUp
patcher_messages = mock.patch("gestioncof.signals.messages")
patcher_messages.start()
self.addCleanup(patcher_messages.stop)
# get some permissions
perms = {
"kfet.is_team": Permission.objects.get(
codename="is_team", content_type__app_label="kfet"
),
"kfet.can_force_close": Permission.objects.get(
codename="can_force_close", content_type__app_label="kfet"
),
}
# authenticated user and its client
self.u = User.objects.create_user("user", "", "user")
self.c = Client()
self.c.login(username="user", password="user")
# team user and its clients
self.t = User.objects.create_user("team", "", "team")
self.t.user_permissions.add(perms["kfet.is_team"])
self.c_t = Client()
self.c_t.login(username="team", password="team")
# admin user and its client
self.a = User.objects.create_user("admin", "", "admin")
self.a.user_permissions.add(
perms["kfet.is_team"], perms["kfet.can_force_close"]
)
self.c_a = Client()
self.c_a.login(username="admin", password="admin")
self.kfet_open = OpenKfet(
cache_prefix="test_kfetopen_%s" % random.randrange(2**20)
)
self.addCleanup(self.kfet_open.clear_cache)
views_patcher = mock.patch("kfet.open.views.kfet_open", self.kfet_open)
views_patcher.start()
self.addCleanup(views_patcher.stop)
def test_door(self):
"""Edit raw_status."""
for sent, expected in [(1, True), (0, False)]:
resp = Client().post(
"/k-fet/open/raw_open", {"raw_open": sent, "token": "plop"}
)
self.assertEqual(200, resp.status_code)
self.assertEqual(expected, self.kfet_open.raw_open)
def test_force_close(self):
"""Edit force_close."""
for sent, expected in [(1, True), (0, False)]:
resp = self.c_a.post("/k-fet/open/force_close", {"force_close": sent})
self.assertEqual(200, resp.status_code)
self.assertEqual(expected, self.kfet_open.force_close)
def test_force_close_forbidden(self):
"""Can't edit force_close without kfet.can_force_close permission."""
clients = [Client(), self.c, self.c_t]
for client in clients:
resp = client.post("/k-fet/open/force_close", {"force_close": 0})
self.assertEqual(403, resp.status_code)
class OpenKfetConsumerTest(TestCase):
"""OpenKfet consumer unit-tests suite."""
@classmethod
def setUpTestData(cls):
t = User.objects.create_user("team", "", "team")
is_team = Permission.objects.get(
codename="is_team", content_type__app_label="kfet"
)
t.user_permissions.add(is_team)
cls.team_user = t
async def test_standard_user(self):
"""Lambda user is added to kfet.open.base group."""
# setup anonymous client
c = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
connected, _ = await c.connect()
self.assertTrue(connected)
# initialization data is replied on connection
message = await c.receive_json_from()
self.assertIsNotNone(message)
# client belongs to the 'kfet.open' group...
channel_layer = get_channel_layer()
await channel_layer.group_send(
"kfet.open.base", {"test": "plop", "type": "open.status"}
)
message = await c.receive_json_from()
self.assertEqual(message, {"test": "plop"})
# ...but not to the 'kfet.open.admin' one
await channel_layer.group_send(
"kfet.open.team", {"test": "plop", "type": "open.status"}
)
self.assertTrue(await c.receive_nothing())
async def test_team_user(self):
"""Team user is added to kfet.open.team group."""
# On simule l'appartenance de l'user à la team kfet car l'utilisation de
# tests async avec postgres fait tout planter si on modifie la db dans un
# des sous tests.
with mock.patch("gestioncof.signals.messages"), mock.patch(
"kfet.open.consumers.kfet_is_team", return_value=True
):
c = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
connected, _ = await c.connect()
channel_layer = get_channel_layer()
self.assertTrue(connected)
# initialization data is replied on connection
message = await c.receive_json_from()
self.assertIsNotNone(message)
# client belongs to the 'kfet.open.team' group...
await channel_layer.group_send(
"kfet.open.team", {"test": "plop", "type": "open.status"}
)
message = await c.receive_json_from()
self.assertEqual(message, {"test": "plop"})
# ...but not to the 'kfet.open' one
await channel_layer.group_send(
"kfet.open.base", {"test": "plop", "type": "open.status"}
)
self.assertTrue(await c.receive_nothing())
class OpenKfetScenarioTest(TestCase):
"""OpenKfet functionnal tests suite."""
@classmethod
def setUpTestData(cls):
# root user
cls.r = User.objects.create_superuser("team", "", "team")
# anonymous client (for views)
cls.c = Client()
# root client
cls.r_c = Client()
with mock.patch("gestioncof.signals.messages"):
cls.r_c.login(username="team", password="team")
def setUp(self):
# Create channels to listen to messages
channel_layer = get_channel_layer()
self.channel = async_to_sync(channel_layer.new_channel)()
self.team_channel = async_to_sync(channel_layer.new_channel)()
async_to_sync(channel_layer.group_add)("kfet.open.base", self.channel)
async_to_sync(channel_layer.group_add)("kfet.open.team", self.team_channel)
self.receive_msg = lambda c: async_to_sync(channel_layer.receive)(c)
self.kfet_open = OpenKfet(
cache_prefix="test_kfetopen_%s" % random.randrange(2**20)
)
self.addCleanup(self.kfet_open.clear_cache)
async def ws_connect(self, ws_communicator):
c, _ = await ws_communicator.connect()
self.assertTrue(c)
return await ws_communicator.receive_json_from()
def test_scenario_1(self):
"""Clients connect, door opens, enable force close."""
# door sent "I'm open!"
self.c.post("/k-fet/open/raw_open", {"raw_open": True, "token": "plop"})
# anonymous user agree
msg = self.receive_msg(self.channel)
self.assertEqual(OpenKfet.OPENED, msg["status"])
# root user too
msg = self.receive_msg(self.team_channel)
self.assertEqual(OpenKfet.OPENED, msg["status"])
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
# admin says "no it's closed"
self.r_c.post("/k-fet/open/force_close", {"force_close": True})
# so anonymous user see it's closed
msg = self.receive_msg(self.channel)
self.assertEqual(OpenKfet.CLOSED, msg["status"])
# root user too
msg = self.receive_msg(self.team_channel)
self.assertEqual(OpenKfet.CLOSED, msg["status"])
# but root knows things
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
self.assertTrue(msg["force_close"])
def test_scenario_2(self):
"""Starting falsely closed, clients connect, disable force close."""
self.kfet_open.raw_open = True
self.kfet_open.force_close = True
async_to_sync(OpenKfet().send_ws)()
msg = self.receive_msg(self.channel)
self.assertEqual(OpenKfet.CLOSED, msg["status"])
msg = self.receive_msg(self.team_channel)
self.assertEqual(OpenKfet.CLOSED, msg["status"])
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
self.assertTrue(msg["force_close"])
self.r_c.post("/k-fet/open/force_close", {"force_close": False})
msg = self.receive_msg(self.channel)
self.assertEqual(OpenKfet.OPENED, msg["status"])
msg = self.receive_msg(self.team_channel)
self.assertEqual(OpenKfet.OPENED, msg["status"])
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
self.assertFalse(msg["force_close"])
async def test_scenario_3(self):
"""Clients connect."""
# anonymous client (for websockets)
self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
# test for anonymous user
msg = await self.ws_connect(self.c_ws)
self.assertSetEqual(set(["status"]), set(msg))
# test for root user
with mock.patch(
"kfet.open.consumers.kfet_is_team", return_value=True
), mock.patch("kfet.open.open.kfet_is_team", return_value=True):
self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
msg = await self.ws_connect(self.r_c_ws)
self.assertSetEqual(
set(["status", "admin_status", "force_close"]), set(msg)
)