Fix kfet.ope tests

This commit is contained in:
Tom Hubrecht 2022-06-29 13:34:52 +02:00 committed by thubrecht
parent 693e4252d5
commit 4108efe8c9

View file

@ -1,19 +1,24 @@
import json
import random import random
from datetime import timedelta from datetime import timedelta
from unittest import mock from unittest import mock
from channels.channel import Group from asgiref.sync import async_to_sync, sync_to_async
from channels.test import ChannelTestCase, WSClient from channels.auth import AuthMiddlewareStack
from channels.consumer import get_channel_layer
from channels.testing import WebsocketCommunicator
from django.contrib.auth.models import AnonymousUser, Permission, User from django.contrib.auth.models import AnonymousUser, Permission, User
from django.test import Client from django.test import Client, TestCase
from django.utils import timezone from django.utils import timezone
from . import OpenKfet from . import OpenKfet
from .consumers import OpenKfetConsumer from .consumers import OpenKfetConsumer
class OpenKfetTest(ChannelTestCase): def ws_communicator(cls, path: str, headers=[]):
return WebsocketCommunicator(AuthMiddlewareStack(cls.as_asgi()), path, headers)
class OpenKfetTest(TestCase):
"""OpenKfet object unit-tests suite.""" """OpenKfet object unit-tests suite."""
def setUp(self): def setUp(self):
@ -79,7 +84,7 @@ class OpenKfetTest(ChannelTestCase):
def test_export_user(self): def test_export_user(self):
"""Export is limited for an anonymous user.""" """Export is limited for an anonymous user."""
export = self.kfet_open.export(AnonymousUser()) export = self.kfet_open.export(AnonymousUser())
self.assertSetEqual(set(["status"]), set(export)) self.assertSetEqual(set(["status", "type"]), set(export))
def test_export_team(self): def test_export_team(self):
"""Export all values for a team member.""" """Export all values for a team member."""
@ -89,24 +94,32 @@ class OpenKfetTest(ChannelTestCase):
) )
user.user_permissions.add(is_team) user.user_permissions.add(is_team)
export = self.kfet_open.export(user) export = self.kfet_open.export(user)
self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(export)) self.assertSetEqual(
set(["status", "admin_status", "force_close", "type"]), set(export)
)
def test_send_ws(self): async def test_send_ws(self):
Group("kfet.open.base").add("test.open.base") channel_layer = get_channel_layer()
Group("kfet.open.team").add("test.open.team") base_channel = await channel_layer.new_channel()
team_channel = await channel_layer.new_channel()
self.kfet_open.send_ws() await channel_layer.group_add("kfet.open.base", base_channel)
await channel_layer.group_add("kfet.open.team", team_channel)
recv_base = self.get_next_message("test.open.base", require=True) await self.kfet_open.send_ws()
base = json.loads(recv_base["text"])
self.assertSetEqual(set(["status"]), set(base))
recv_admin = self.get_next_message("test.open.team", require=True) base = await channel_layer.receive(base_channel)
admin = json.loads(recv_admin["text"])
self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(admin)) self.assertSetEqual(set(["status", "type"]), set(base))
team = await channel_layer.receive(team_channel)
self.assertSetEqual(
set(["status", "admin_status", "force_close", "type"]), set(team)
)
class OpenKfetViewsTest(ChannelTestCase): class OpenKfetViewsTest(TestCase):
"""OpenKfet views unit-tests suite.""" """OpenKfet views unit-tests suite."""
def setUp(self): def setUp(self):
@ -177,60 +190,82 @@ class OpenKfetViewsTest(ChannelTestCase):
self.assertEqual(403, resp.status_code) self.assertEqual(403, resp.status_code)
class OpenKfetConsumerTest(ChannelTestCase): class OpenKfetConsumerTest(TestCase):
"""OpenKfet consumer unit-tests suite.""" """OpenKfet consumer unit-tests suite."""
def test_standard_user(self): def setUp(self):
"""Lambda user is added to kfet.open.base group."""
# setup anonymous client
c = WSClient()
# connect
c.send_and_consume(
"websocket.connect", path="/ws/k-fet/open", fail_on_none=True
)
# initialization data is replied on connection
self.assertIsNotNone(c.receive())
# client belongs to the 'kfet.open' group...
OpenKfetConsumer.group_send("kfet.open.base", {"test": "plop"})
self.assertEqual(c.receive(), {"test": "plop"})
# ...but not to the 'kfet.open.admin' one
OpenKfetConsumer.group_send("kfet.open.team", {"test": "plop"})
self.assertIsNone(c.receive())
@mock.patch("gestioncof.signals.messages")
def test_team_user(self, mock_messages):
"""Team user is added to kfet.open.team group."""
# setup team user and its client
t = User.objects.create_user("team", "", "team") t = User.objects.create_user("team", "", "team")
is_team = Permission.objects.get( is_team = Permission.objects.get(
codename="is_team", content_type__app_label="kfet" codename="is_team", content_type__app_label="kfet"
) )
t.user_permissions.add(is_team) t.user_permissions.add(is_team)
c = WSClient()
c.force_login(t, backend="django.contrib.auth.backends.ModelBackend")
# connect self.team_user = t
c.send_and_consume(
"websocket.connect", path="/ws/k-fet/open", fail_on_none=True @async_to_sync
) async def test_standard_user(self):
"""Lambda user is added to kfet.open.base group."""
# setup anonymous client
c = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
connected, _ = await c.connect()
self.assertTrue(connected)
# initialization data is replied on connection # initialization data is replied on connection
self.assertIsNotNone(c.receive()) message = await c.receive_json_from()
self.assertIsNotNone(message)
# client belongs to the 'kfet.open.admin' group... # client belongs to the 'kfet.open' group...
OpenKfetConsumer.group_send("kfet.open.team", {"test": "plop"}) channel_layer = get_channel_layer()
self.assertEqual(c.receive(), {"test": "plop"})
# ... but not to the 'kfet.open' one await channel_layer.group_send(
OpenKfetConsumer.group_send("kfet.open.base", {"test": "plop"}) "kfet.open.base", {"test": "plop", "type": "open.status"}
self.assertIsNone(c.receive()) )
message = await c.receive_json_from()
self.assertEqual(message, {"test": "plop"})
# ...but not to the 'kfet.open.admin' one
await channel_layer.group_send(
"kfet.open.team", {"test": "plop", "type": "open.status"}
)
self.assertTrue(await c.receive_nothing())
async def test_team_user(self):
"""Team user is added to kfet.open.team group."""
with mock.patch("gestioncof.signals.messages"), mock.patch(
"kfet.open.consumers.kfet_is_team", return_value=True
):
c = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
connected, _ = await c.connect()
channel_layer = get_channel_layer()
self.assertTrue(connected)
# initialization data is replied on connection
message = await c.receive_json_from()
self.assertIsNotNone(message)
# client belongs to the 'kfet.open.team' group...
await channel_layer.group_send(
"kfet.open.team", {"test": "plop", "type": "open.status"}
)
message = await c.receive_json_from()
self.assertEqual(message, {"test": "plop"})
# ...but not to the 'kfet.open' one
await channel_layer.group_send(
"kfet.open.base", {"test": "plop", "type": "open.status"}
)
self.assertTrue(await c.receive_nothing())
class OpenKfetScenarioTest(ChannelTestCase): class OpenKfetScenarioTest(TestCase):
"""OpenKfet functionnal tests suite.""" """OpenKfet functionnal tests suite."""
def setUp(self): def setUp(self):
@ -241,91 +276,115 @@ class OpenKfetScenarioTest(ChannelTestCase):
# anonymous client (for views) # anonymous client (for views)
self.c = Client() self.c = Client()
# anonymous client (for websockets)
self.c_ws = WSClient()
# root user # root user
self.r = User.objects.create_superuser("root", "", "root") self.r = User.objects.create_superuser("root", "", "root")
# its client (for views)
# root client
self.r_c = Client() self.r_c = Client()
self.r_c.login(username="root", password="root") self.r_c.login(username="root", password="root")
# its client (for websockets)
self.r_c_ws = WSClient()
self.r_c_ws.force_login(
self.r, backend="django.contrib.auth.backends.ModelBackend"
)
self.kfet_open = OpenKfet( self.kfet_open = OpenKfet(
cache_prefix="test_kfetopen_%s" % random.randrange(2**20) cache_prefix="test_kfetopen_%s" % random.randrange(2**20)
) )
self.addCleanup(self.kfet_open.clear_cache) self.addCleanup(self.kfet_open.clear_cache)
def ws_connect(self, ws_client): async def ws_connect(self, ws_client):
ws_client.send_and_consume( c, _ = await ws_client.connect()
"websocket.connect", path="/ws/k-fet/open", fail_on_none=True
)
return ws_client.receive(json=True)
def test_scenario_0(self): self.assertTrue(c)
return await ws_client.receive_json_from()
async def test_scenario_0(self):
"""Clients connect.""" """Clients connect."""
# anonymous client (for websockets)
self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
# test for anonymous user # test for anonymous user
msg = self.ws_connect(self.c_ws) msg = await self.ws_connect(self.c_ws)
self.assertSetEqual(set(["status"]), set(msg)) self.assertSetEqual(set(["status"]), set(msg))
# test for root user # test for root user
msg = self.ws_connect(self.r_c_ws) with mock.patch(
self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(msg)) "kfet.open.consumers.kfet_is_team", return_value=True
), mock.patch("kfet.open.open.kfet_is_team", return_value=True):
self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
def test_scenario_1(self): msg = await self.ws_connect(self.r_c_ws)
self.assertSetEqual(
set(["status", "admin_status", "force_close"]), set(msg)
)
async def test_scenario_1(self):
"""Clients connect, door opens, enable force close.""" """Clients connect, door opens, enable force close."""
self.ws_connect(self.c_ws)
self.ws_connect(self.r_c_ws) self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
await self.ws_connect(self.c_ws)
with mock.patch(
"kfet.open.consumers.kfet_is_team", return_value=True
), mock.patch("kfet.open.open.kfet_is_team", return_value=True):
self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
await self.ws_connect(self.r_c_ws)
# door sent "I'm open!" # door sent "I'm open!"
self.c.post("/k-fet/open/raw_open", {"raw_open": True, "token": "plop"}) await sync_to_async(self.c.post)(
"/k-fet/open/raw_open", {"raw_open": True, "token": "plop"}
)
# anonymous user agree # anonymous user agree
msg = self.c_ws.receive(json=True) msg = await self.c_ws.receive_json_from()
self.assertEqual(OpenKfet.OPENED, msg["status"]) self.assertEqual(OpenKfet.OPENED, msg["status"])
# root user too # root user too
msg = self.r_c_ws.receive(json=True) msg = await self.r_c_ws.receive_json_from()
self.assertEqual(OpenKfet.OPENED, msg["status"]) self.assertEqual(OpenKfet.OPENED, msg["status"])
self.assertEqual(OpenKfet.OPENED, msg["admin_status"]) self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
# admin says "no it's closed" # admin says "no it's closed"
self.r_c.post("/k-fet/open/force_close", {"force_close": True}) await sync_to_async(self.r_c.post)(
"/k-fet/open/force_close", {"force_close": True}
)
# so anonymous user see it's closed # so anonymous user see it's closed
msg = self.c_ws.receive(json=True) msg = await self.c_ws.receive_json_from()
self.assertEqual(OpenKfet.CLOSED, msg["status"]) self.assertEqual(OpenKfet.CLOSED, msg["status"])
# root user too # root user too
msg = self.r_c_ws.receive(json=True) msg = await self.r_c_ws.receive_json_from()
self.assertEqual(OpenKfet.CLOSED, msg["status"]) self.assertEqual(OpenKfet.CLOSED, msg["status"])
# but root knows things # but root knows things
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"]) self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
self.assertTrue(msg["force_close"]) self.assertTrue(msg["force_close"])
def test_scenario_2(self): async def test_scenario_2(self):
"""Starting falsely closed, clients connect, disable force close.""" """Starting falsely closed, clients connect, disable force close."""
self.kfet_open.raw_open = True self.kfet_open.raw_open = True
self.kfet_open.force_close = True self.kfet_open.force_close = True
msg = self.ws_connect(self.c_ws) self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
msg = await self.ws_connect(self.c_ws)
self.assertEqual(OpenKfet.CLOSED, msg["status"]) self.assertEqual(OpenKfet.CLOSED, msg["status"])
msg = self.ws_connect(self.r_c_ws) with mock.patch(
self.assertEqual(OpenKfet.CLOSED, msg["status"]) "kfet.open.consumers.kfet_is_team", return_value=True
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"]) ), mock.patch("kfet.open.open.kfet_is_team", return_value=True):
self.assertTrue(msg["force_close"]) self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
msg = await self.ws_connect(self.r_c_ws)
self.assertEqual(OpenKfet.CLOSED, msg["status"])
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
self.assertTrue(msg["force_close"])
self.r_c.post("/k-fet/open/force_close", {"force_close": False}) await sync_to_async(self.r_c.post)(
"/k-fet/open/force_close", {"force_close": False}
)
msg = self.c_ws.receive(json=True) msg = await self.c_ws.receive_json_from()
self.assertEqual(OpenKfet.OPENED, msg["status"]) self.assertEqual(OpenKfet.OPENED, msg["status"])
msg = self.r_c_ws.receive(json=True) msg = await self.r_c_ws.receive_json_from()
self.assertEqual(OpenKfet.OPENED, msg["status"]) self.assertEqual(OpenKfet.OPENED, msg["status"])
self.assertEqual(OpenKfet.OPENED, msg["admin_status"]) self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
self.assertFalse(msg["force_close"]) self.assertFalse(msg["force_close"])