From 9a7deb5e2a8d0e16a6a7644a019fd696b54a554a Mon Sep 17 00:00:00 2001 From: Tom Hubrecht Date: Wed, 29 Jun 2022 13:34:52 +0200 Subject: [PATCH] Fix kfet.ope tests --- kfet/open/tests.py | 245 ++++++++++++++++++++++++++++----------------- 1 file changed, 152 insertions(+), 93 deletions(-) diff --git a/kfet/open/tests.py b/kfet/open/tests.py index 7918ace7..2c3f9c4e 100644 --- a/kfet/open/tests.py +++ b/kfet/open/tests.py @@ -1,19 +1,24 @@ -import json import random from datetime import timedelta from unittest import mock -from channels.channel import Group -from channels.test import ChannelTestCase, WSClient +from asgiref.sync import async_to_sync, sync_to_async +from channels.auth import AuthMiddlewareStack +from channels.consumer import get_channel_layer +from channels.testing import WebsocketCommunicator from django.contrib.auth.models import AnonymousUser, Permission, User -from django.test import Client +from django.test import Client, TestCase from django.utils import timezone from . import OpenKfet from .consumers import OpenKfetConsumer -class OpenKfetTest(ChannelTestCase): +def ws_communicator(cls, path: str, headers=[]): + return WebsocketCommunicator(AuthMiddlewareStack(cls.as_asgi()), path, headers) + + +class OpenKfetTest(TestCase): """OpenKfet object unit-tests suite.""" def setUp(self): @@ -79,7 +84,7 @@ class OpenKfetTest(ChannelTestCase): def test_export_user(self): """Export is limited for an anonymous user.""" export = self.kfet_open.export(AnonymousUser()) - self.assertSetEqual(set(["status"]), set(export)) + self.assertSetEqual(set(["status", "type"]), set(export)) def test_export_team(self): """Export all values for a team member.""" @@ -89,24 +94,32 @@ class OpenKfetTest(ChannelTestCase): ) user.user_permissions.add(is_team) export = self.kfet_open.export(user) - self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(export)) + self.assertSetEqual( + set(["status", "admin_status", "force_close", "type"]), set(export) + ) - def test_send_ws(self): - Group("kfet.open.base").add("test.open.base") - Group("kfet.open.team").add("test.open.team") + async def test_send_ws(self): + channel_layer = get_channel_layer() + base_channel = await channel_layer.new_channel() + team_channel = await channel_layer.new_channel() - self.kfet_open.send_ws() + await channel_layer.group_add("kfet.open.base", base_channel) + await channel_layer.group_add("kfet.open.team", team_channel) - recv_base = self.get_next_message("test.open.base", require=True) - base = json.loads(recv_base["text"]) - self.assertSetEqual(set(["status"]), set(base)) + await self.kfet_open.send_ws() - recv_admin = self.get_next_message("test.open.team", require=True) - admin = json.loads(recv_admin["text"]) - self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(admin)) + base = await channel_layer.receive(base_channel) + + self.assertSetEqual(set(["status", "type"]), set(base)) + + team = await channel_layer.receive(team_channel) + + self.assertSetEqual( + set(["status", "admin_status", "force_close", "type"]), set(team) + ) -class OpenKfetViewsTest(ChannelTestCase): +class OpenKfetViewsTest(TestCase): """OpenKfet views unit-tests suite.""" def setUp(self): @@ -177,60 +190,82 @@ class OpenKfetViewsTest(ChannelTestCase): self.assertEqual(403, resp.status_code) -class OpenKfetConsumerTest(ChannelTestCase): +class OpenKfetConsumerTest(TestCase): """OpenKfet consumer unit-tests suite.""" - def test_standard_user(self): - """Lambda user is added to kfet.open.base group.""" - # setup anonymous client - c = WSClient() - - # connect - c.send_and_consume( - "websocket.connect", path="/ws/k-fet/open", fail_on_none=True - ) - - # initialization data is replied on connection - self.assertIsNotNone(c.receive()) - - # client belongs to the 'kfet.open' group... - OpenKfetConsumer.group_send("kfet.open.base", {"test": "plop"}) - self.assertEqual(c.receive(), {"test": "plop"}) - - # ...but not to the 'kfet.open.admin' one - OpenKfetConsumer.group_send("kfet.open.team", {"test": "plop"}) - self.assertIsNone(c.receive()) - - @mock.patch("gestioncof.signals.messages") - def test_team_user(self, mock_messages): - """Team user is added to kfet.open.team group.""" - # setup team user and its client + def setUp(self): t = User.objects.create_user("team", "", "team") is_team = Permission.objects.get( codename="is_team", content_type__app_label="kfet" ) t.user_permissions.add(is_team) - c = WSClient() - c.force_login(t, backend="django.contrib.auth.backends.ModelBackend") - # connect - c.send_and_consume( - "websocket.connect", path="/ws/k-fet/open", fail_on_none=True - ) + self.team_user = t + + @async_to_sync + async def test_standard_user(self): + """Lambda user is added to kfet.open.base group.""" + # setup anonymous client + c = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") + + connected, _ = await c.connect() + + self.assertTrue(connected) # initialization data is replied on connection - self.assertIsNotNone(c.receive()) + message = await c.receive_json_from() + self.assertIsNotNone(message) - # client belongs to the 'kfet.open.admin' group... - OpenKfetConsumer.group_send("kfet.open.team", {"test": "plop"}) - self.assertEqual(c.receive(), {"test": "plop"}) + # client belongs to the 'kfet.open' group... + channel_layer = get_channel_layer() - # ... but not to the 'kfet.open' one - OpenKfetConsumer.group_send("kfet.open.base", {"test": "plop"}) - self.assertIsNone(c.receive()) + await channel_layer.group_send( + "kfet.open.base", {"test": "plop", "type": "open.status"} + ) + message = await c.receive_json_from() + + self.assertEqual(message, {"test": "plop"}) + + # ...but not to the 'kfet.open.admin' one + await channel_layer.group_send( + "kfet.open.team", {"test": "plop", "type": "open.status"} + ) + self.assertTrue(await c.receive_nothing()) + + async def test_team_user(self): + """Team user is added to kfet.open.team group.""" + + with mock.patch("gestioncof.signals.messages"), mock.patch( + "kfet.open.consumers.kfet_is_team", return_value=True + ): + c = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") + + connected, _ = await c.connect() + + channel_layer = get_channel_layer() + + self.assertTrue(connected) + + # initialization data is replied on connection + message = await c.receive_json_from() + self.assertIsNotNone(message) + + # client belongs to the 'kfet.open.team' group... + await channel_layer.group_send( + "kfet.open.team", {"test": "plop", "type": "open.status"} + ) + message = await c.receive_json_from() + + self.assertEqual(message, {"test": "plop"}) + + # ...but not to the 'kfet.open' one + await channel_layer.group_send( + "kfet.open.base", {"test": "plop", "type": "open.status"} + ) + self.assertTrue(await c.receive_nothing()) -class OpenKfetScenarioTest(ChannelTestCase): +class OpenKfetScenarioTest(TestCase): """OpenKfet functionnal tests suite.""" def setUp(self): @@ -241,91 +276,115 @@ class OpenKfetScenarioTest(ChannelTestCase): # anonymous client (for views) self.c = Client() - # anonymous client (for websockets) - self.c_ws = WSClient() # root user self.r = User.objects.create_superuser("root", "", "root") - # its client (for views) + + # root client self.r_c = Client() self.r_c.login(username="root", password="root") - # its client (for websockets) - self.r_c_ws = WSClient() - self.r_c_ws.force_login( - self.r, backend="django.contrib.auth.backends.ModelBackend" - ) self.kfet_open = OpenKfet( cache_prefix="test_kfetopen_%s" % random.randrange(2**20) ) self.addCleanup(self.kfet_open.clear_cache) - def ws_connect(self, ws_client): - ws_client.send_and_consume( - "websocket.connect", path="/ws/k-fet/open", fail_on_none=True - ) - return ws_client.receive(json=True) + async def ws_connect(self, ws_client): + c, _ = await ws_client.connect() - def test_scenario_0(self): + self.assertTrue(c) + + return await ws_client.receive_json_from() + + async def test_scenario_0(self): """Clients connect.""" + # anonymous client (for websockets) + self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") + # test for anonymous user - msg = self.ws_connect(self.c_ws) + msg = await self.ws_connect(self.c_ws) self.assertSetEqual(set(["status"]), set(msg)) # test for root user - msg = self.ws_connect(self.r_c_ws) - self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(msg)) + with mock.patch( + "kfet.open.consumers.kfet_is_team", return_value=True + ), mock.patch("kfet.open.open.kfet_is_team", return_value=True): + self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") - def test_scenario_1(self): + msg = await self.ws_connect(self.r_c_ws) + self.assertSetEqual( + set(["status", "admin_status", "force_close"]), set(msg) + ) + + async def test_scenario_1(self): """Clients connect, door opens, enable force close.""" - self.ws_connect(self.c_ws) - self.ws_connect(self.r_c_ws) + + self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") + await self.ws_connect(self.c_ws) + + with mock.patch( + "kfet.open.consumers.kfet_is_team", return_value=True + ), mock.patch("kfet.open.open.kfet_is_team", return_value=True): + self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") + await self.ws_connect(self.r_c_ws) # door sent "I'm open!" - self.c.post("/k-fet/open/raw_open", {"raw_open": True, "token": "plop"}) + await sync_to_async(self.c.post)( + "/k-fet/open/raw_open", {"raw_open": True, "token": "plop"} + ) # anonymous user agree - msg = self.c_ws.receive(json=True) + msg = await self.c_ws.receive_json_from() self.assertEqual(OpenKfet.OPENED, msg["status"]) # root user too - msg = self.r_c_ws.receive(json=True) + msg = await self.r_c_ws.receive_json_from() self.assertEqual(OpenKfet.OPENED, msg["status"]) self.assertEqual(OpenKfet.OPENED, msg["admin_status"]) # admin says "no it's closed" - self.r_c.post("/k-fet/open/force_close", {"force_close": True}) + await sync_to_async(self.r_c.post)( + "/k-fet/open/force_close", {"force_close": True} + ) # so anonymous user see it's closed - msg = self.c_ws.receive(json=True) + msg = await self.c_ws.receive_json_from() self.assertEqual(OpenKfet.CLOSED, msg["status"]) # root user too - msg = self.r_c_ws.receive(json=True) + msg = await self.r_c_ws.receive_json_from() self.assertEqual(OpenKfet.CLOSED, msg["status"]) # but root knows things self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"]) self.assertTrue(msg["force_close"]) - def test_scenario_2(self): + async def test_scenario_2(self): """Starting falsely closed, clients connect, disable force close.""" self.kfet_open.raw_open = True self.kfet_open.force_close = True - msg = self.ws_connect(self.c_ws) + self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") + + msg = await self.ws_connect(self.c_ws) self.assertEqual(OpenKfet.CLOSED, msg["status"]) - msg = self.ws_connect(self.r_c_ws) - self.assertEqual(OpenKfet.CLOSED, msg["status"]) - self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"]) - self.assertTrue(msg["force_close"]) + with mock.patch( + "kfet.open.consumers.kfet_is_team", return_value=True + ), mock.patch("kfet.open.open.kfet_is_team", return_value=True): + self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") + msg = await self.ws_connect(self.r_c_ws) + self.assertEqual(OpenKfet.CLOSED, msg["status"]) + self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"]) + self.assertTrue(msg["force_close"]) - self.r_c.post("/k-fet/open/force_close", {"force_close": False}) + await sync_to_async(self.r_c.post)( + "/k-fet/open/force_close", {"force_close": False} + ) - msg = self.c_ws.receive(json=True) + msg = await self.c_ws.receive_json_from() self.assertEqual(OpenKfet.OPENED, msg["status"]) - msg = self.r_c_ws.receive(json=True) + msg = await self.r_c_ws.receive_json_from() self.assertEqual(OpenKfet.OPENED, msg["status"]) self.assertEqual(OpenKfet.OPENED, msg["admin_status"]) self.assertFalse(msg["force_close"])