diff --git a/kfet/open/tests.py b/kfet/open/tests.py index 2c3f9c4e..9eb07697 100644 --- a/kfet/open/tests.py +++ b/kfet/open/tests.py @@ -2,7 +2,7 @@ import random from datetime import timedelta from unittest import mock -from asgiref.sync import async_to_sync, sync_to_async +from asgiref.sync import async_to_sync from channels.auth import AuthMiddlewareStack from channels.consumer import get_channel_layer from channels.testing import WebsocketCommunicator @@ -193,16 +193,16 @@ class OpenKfetViewsTest(TestCase): class OpenKfetConsumerTest(TestCase): """OpenKfet consumer unit-tests suite.""" - def setUp(self): + @classmethod + def setUpTestData(cls): t = User.objects.create_user("team", "", "team") is_team = Permission.objects.get( codename="is_team", content_type__app_label="kfet" ) t.user_permissions.add(is_team) - self.team_user = t + cls.team_user = t - @async_to_sync async def test_standard_user(self): """Lambda user is added to kfet.open.base group.""" # setup anonymous client @@ -268,35 +268,97 @@ class OpenKfetConsumerTest(TestCase): class OpenKfetScenarioTest(TestCase): """OpenKfet functionnal tests suite.""" - def setUp(self): - # Need this (and here) because of '.login' in setUp - patcher_messages = mock.patch("gestioncof.signals.messages") - patcher_messages.start() - self.addCleanup(patcher_messages.stop) + @classmethod + def setUpTestData(cls): + # root user + cls.r = User.objects.create_superuser("team", "", "team") # anonymous client (for views) - self.c = Client() - - # root user - self.r = User.objects.create_superuser("root", "", "root") + cls.c = Client() # root client - self.r_c = Client() - self.r_c.login(username="root", password="root") + cls.r_c = Client() + + with mock.patch("gestioncof.signals.messages"): + cls.r_c.login(username="team", password="team") + + def setUp(self): + # Create a channel to listen to KPsul's messages + channel_layer = get_channel_layer() + self.channel = async_to_sync(channel_layer.new_channel)() + self.team_channel = async_to_sync(channel_layer.new_channel)() + + async_to_sync(channel_layer.group_add)("kfet.open.base", self.channel) + async_to_sync(channel_layer.group_add)("kfet.open.team", self.team_channel) + + self.receive_msg = lambda c: async_to_sync(channel_layer.receive)(c) self.kfet_open = OpenKfet( cache_prefix="test_kfetopen_%s" % random.randrange(2**20) ) self.addCleanup(self.kfet_open.clear_cache) - async def ws_connect(self, ws_client): - c, _ = await ws_client.connect() + async def ws_connect(self, ws_communicator): + c, _ = await ws_communicator.connect() self.assertTrue(c) + return await ws_communicator.receive_json_from() - return await ws_client.receive_json_from() + def test_scenario_1(self): + """Clients connect, door opens, enable force close.""" - async def test_scenario_0(self): + # door sent "I'm open!" + self.c.post("/k-fet/open/raw_open", {"raw_open": True, "token": "plop"}) + + # anonymous user agree + msg = self.receive_msg(self.channel) + self.assertEqual(OpenKfet.OPENED, msg["status"]) + + # root user too + msg = self.receive_msg(self.team_channel) + self.assertEqual(OpenKfet.OPENED, msg["status"]) + self.assertEqual(OpenKfet.OPENED, msg["admin_status"]) + + # admin says "no it's closed" + self.r_c.post("/k-fet/open/force_close", {"force_close": True}) + + # so anonymous user see it's closed + msg = self.receive_msg(self.channel) + self.assertEqual(OpenKfet.CLOSED, msg["status"]) + + # root user too + msg = self.receive_msg(self.team_channel) + self.assertEqual(OpenKfet.CLOSED, msg["status"]) + # but root knows things + self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"]) + self.assertTrue(msg["force_close"]) + + def test_scenario_2(self): + """Starting falsely closed, clients connect, disable force close.""" + self.kfet_open.raw_open = True + self.kfet_open.force_close = True + + async_to_sync(OpenKfet().send_ws)() + + msg = self.receive_msg(self.channel) + self.assertEqual(OpenKfet.CLOSED, msg["status"]) + + msg = self.receive_msg(self.team_channel) + self.assertEqual(OpenKfet.CLOSED, msg["status"]) + self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"]) + self.assertTrue(msg["force_close"]) + + self.r_c.post("/k-fet/open/force_close", {"force_close": False}) + + msg = self.receive_msg(self.channel) + self.assertEqual(OpenKfet.OPENED, msg["status"]) + + msg = self.receive_msg(self.team_channel) + self.assertEqual(OpenKfet.OPENED, msg["status"]) + self.assertEqual(OpenKfet.OPENED, msg["admin_status"]) + self.assertFalse(msg["force_close"]) + + async def test_scenario_3(self): """Clients connect.""" # anonymous client (for websockets) self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") @@ -315,76 +377,3 @@ class OpenKfetScenarioTest(TestCase): self.assertSetEqual( set(["status", "admin_status", "force_close"]), set(msg) ) - - async def test_scenario_1(self): - """Clients connect, door opens, enable force close.""" - - self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") - await self.ws_connect(self.c_ws) - - with mock.patch( - "kfet.open.consumers.kfet_is_team", return_value=True - ), mock.patch("kfet.open.open.kfet_is_team", return_value=True): - self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") - await self.ws_connect(self.r_c_ws) - - # door sent "I'm open!" - await sync_to_async(self.c.post)( - "/k-fet/open/raw_open", {"raw_open": True, "token": "plop"} - ) - - # anonymous user agree - msg = await self.c_ws.receive_json_from() - self.assertEqual(OpenKfet.OPENED, msg["status"]) - - # root user too - msg = await self.r_c_ws.receive_json_from() - self.assertEqual(OpenKfet.OPENED, msg["status"]) - self.assertEqual(OpenKfet.OPENED, msg["admin_status"]) - - # admin says "no it's closed" - await sync_to_async(self.r_c.post)( - "/k-fet/open/force_close", {"force_close": True} - ) - - # so anonymous user see it's closed - msg = await self.c_ws.receive_json_from() - self.assertEqual(OpenKfet.CLOSED, msg["status"]) - - # root user too - msg = await self.r_c_ws.receive_json_from() - self.assertEqual(OpenKfet.CLOSED, msg["status"]) - # but root knows things - self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"]) - self.assertTrue(msg["force_close"]) - - async def test_scenario_2(self): - """Starting falsely closed, clients connect, disable force close.""" - self.kfet_open.raw_open = True - self.kfet_open.force_close = True - - self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") - - msg = await self.ws_connect(self.c_ws) - self.assertEqual(OpenKfet.CLOSED, msg["status"]) - - with mock.patch( - "kfet.open.consumers.kfet_is_team", return_value=True - ), mock.patch("kfet.open.open.kfet_is_team", return_value=True): - self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") - msg = await self.ws_connect(self.r_c_ws) - self.assertEqual(OpenKfet.CLOSED, msg["status"]) - self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"]) - self.assertTrue(msg["force_close"]) - - await sync_to_async(self.r_c.post)( - "/k-fet/open/force_close", {"force_close": False} - ) - - msg = await self.c_ws.receive_json_from() - self.assertEqual(OpenKfet.OPENED, msg["status"]) - - msg = await self.r_c_ws.receive_json_from() - self.assertEqual(OpenKfet.OPENED, msg["status"]) - self.assertEqual(OpenKfet.OPENED, msg["admin_status"]) - self.assertFalse(msg["force_close"])