Thubrecht/daphne #815

Merged
thubrecht merged 39 commits from thubrecht/daphne into master 2024-07-30 18:40:56 +02:00
Showing only changes of commit 3fee014384 - Show all commits

View file

@ -2,7 +2,7 @@ import random
from datetime import timedelta from datetime import timedelta
from unittest import mock from unittest import mock
from asgiref.sync import async_to_sync, sync_to_async from asgiref.sync import async_to_sync
from channels.auth import AuthMiddlewareStack from channels.auth import AuthMiddlewareStack
from channels.consumer import get_channel_layer from channels.consumer import get_channel_layer
from channels.testing import WebsocketCommunicator from channels.testing import WebsocketCommunicator
@ -193,16 +193,16 @@ class OpenKfetViewsTest(TestCase):
class OpenKfetConsumerTest(TestCase): class OpenKfetConsumerTest(TestCase):
"""OpenKfet consumer unit-tests suite.""" """OpenKfet consumer unit-tests suite."""
def setUp(self): @classmethod
def setUpTestData(cls):
t = User.objects.create_user("team", "", "team") t = User.objects.create_user("team", "", "team")
is_team = Permission.objects.get( is_team = Permission.objects.get(
codename="is_team", content_type__app_label="kfet" codename="is_team", content_type__app_label="kfet"
) )
t.user_permissions.add(is_team) t.user_permissions.add(is_team)
self.team_user = t cls.team_user = t
@async_to_sync
async def test_standard_user(self): async def test_standard_user(self):
"""Lambda user is added to kfet.open.base group.""" """Lambda user is added to kfet.open.base group."""
# setup anonymous client # setup anonymous client
@ -268,35 +268,97 @@ class OpenKfetConsumerTest(TestCase):
class OpenKfetScenarioTest(TestCase): class OpenKfetScenarioTest(TestCase):
"""OpenKfet functionnal tests suite.""" """OpenKfet functionnal tests suite."""
def setUp(self): @classmethod
# Need this (and here) because of '<client>.login' in setUp def setUpTestData(cls):
patcher_messages = mock.patch("gestioncof.signals.messages") # root user
patcher_messages.start() cls.r = User.objects.create_superuser("team", "", "team")
self.addCleanup(patcher_messages.stop)
# anonymous client (for views) # anonymous client (for views)
self.c = Client() cls.c = Client()
# root user
self.r = User.objects.create_superuser("root", "", "root")
# root client # root client
self.r_c = Client() cls.r_c = Client()
self.r_c.login(username="root", password="root")
with mock.patch("gestioncof.signals.messages"):
cls.r_c.login(username="team", password="team")
def setUp(self):
# Create a channel to listen to KPsul's messages
channel_layer = get_channel_layer()
self.channel = async_to_sync(channel_layer.new_channel)()
self.team_channel = async_to_sync(channel_layer.new_channel)()
async_to_sync(channel_layer.group_add)("kfet.open.base", self.channel)
async_to_sync(channel_layer.group_add)("kfet.open.team", self.team_channel)
self.receive_msg = lambda c: async_to_sync(channel_layer.receive)(c)
self.kfet_open = OpenKfet( self.kfet_open = OpenKfet(
cache_prefix="test_kfetopen_%s" % random.randrange(2**20) cache_prefix="test_kfetopen_%s" % random.randrange(2**20)
) )
self.addCleanup(self.kfet_open.clear_cache) self.addCleanup(self.kfet_open.clear_cache)
async def ws_connect(self, ws_client): async def ws_connect(self, ws_communicator):
c, _ = await ws_client.connect() c, _ = await ws_communicator.connect()
self.assertTrue(c) self.assertTrue(c)
return await ws_communicator.receive_json_from()
return await ws_client.receive_json_from() def test_scenario_1(self):
"""Clients connect, door opens, enable force close."""
async def test_scenario_0(self): # door sent "I'm open!"
self.c.post("/k-fet/open/raw_open", {"raw_open": True, "token": "plop"})
# anonymous user agree
msg = self.receive_msg(self.channel)
self.assertEqual(OpenKfet.OPENED, msg["status"])
# root user too
msg = self.receive_msg(self.team_channel)
self.assertEqual(OpenKfet.OPENED, msg["status"])
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
# admin says "no it's closed"
self.r_c.post("/k-fet/open/force_close", {"force_close": True})
# so anonymous user see it's closed
msg = self.receive_msg(self.channel)
self.assertEqual(OpenKfet.CLOSED, msg["status"])
# root user too
msg = self.receive_msg(self.team_channel)
self.assertEqual(OpenKfet.CLOSED, msg["status"])
# but root knows things
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
self.assertTrue(msg["force_close"])
def test_scenario_2(self):
"""Starting falsely closed, clients connect, disable force close."""
self.kfet_open.raw_open = True
self.kfet_open.force_close = True
async_to_sync(OpenKfet().send_ws)()
msg = self.receive_msg(self.channel)
self.assertEqual(OpenKfet.CLOSED, msg["status"])
msg = self.receive_msg(self.team_channel)
self.assertEqual(OpenKfet.CLOSED, msg["status"])
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
self.assertTrue(msg["force_close"])
self.r_c.post("/k-fet/open/force_close", {"force_close": False})
msg = self.receive_msg(self.channel)
self.assertEqual(OpenKfet.OPENED, msg["status"])
msg = self.receive_msg(self.team_channel)
self.assertEqual(OpenKfet.OPENED, msg["status"])
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
self.assertFalse(msg["force_close"])
async def test_scenario_3(self):
"""Clients connect.""" """Clients connect."""
# anonymous client (for websockets) # anonymous client (for websockets)
self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
@ -315,76 +377,3 @@ class OpenKfetScenarioTest(TestCase):
self.assertSetEqual( self.assertSetEqual(
set(["status", "admin_status", "force_close"]), set(msg) set(["status", "admin_status", "force_close"]), set(msg)
) )
async def test_scenario_1(self):
"""Clients connect, door opens, enable force close."""
self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
await self.ws_connect(self.c_ws)
with mock.patch(
"kfet.open.consumers.kfet_is_team", return_value=True
), mock.patch("kfet.open.open.kfet_is_team", return_value=True):
self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
await self.ws_connect(self.r_c_ws)
# door sent "I'm open!"
await sync_to_async(self.c.post)(
"/k-fet/open/raw_open", {"raw_open": True, "token": "plop"}
)
# anonymous user agree
msg = await self.c_ws.receive_json_from()
self.assertEqual(OpenKfet.OPENED, msg["status"])
# root user too
msg = await self.r_c_ws.receive_json_from()
self.assertEqual(OpenKfet.OPENED, msg["status"])
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
# admin says "no it's closed"
await sync_to_async(self.r_c.post)(
"/k-fet/open/force_close", {"force_close": True}
)
# so anonymous user see it's closed
msg = await self.c_ws.receive_json_from()
self.assertEqual(OpenKfet.CLOSED, msg["status"])
# root user too
msg = await self.r_c_ws.receive_json_from()
self.assertEqual(OpenKfet.CLOSED, msg["status"])
# but root knows things
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
self.assertTrue(msg["force_close"])
async def test_scenario_2(self):
"""Starting falsely closed, clients connect, disable force close."""
self.kfet_open.raw_open = True
self.kfet_open.force_close = True
self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
msg = await self.ws_connect(self.c_ws)
self.assertEqual(OpenKfet.CLOSED, msg["status"])
with mock.patch(
"kfet.open.consumers.kfet_is_team", return_value=True
), mock.patch("kfet.open.open.kfet_is_team", return_value=True):
self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
msg = await self.ws_connect(self.r_c_ws)
self.assertEqual(OpenKfet.CLOSED, msg["status"])
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
self.assertTrue(msg["force_close"])
await sync_to_async(self.r_c.post)(
"/k-fet/open/force_close", {"force_close": False}
)
msg = await self.c_ws.receive_json_from()
self.assertEqual(OpenKfet.OPENED, msg["status"])
msg = await self.r_c_ws.receive_json_from()
self.assertEqual(OpenKfet.OPENED, msg["status"])
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
self.assertFalse(msg["force_close"])