Fix kfet.ope tests
This commit is contained in:
parent
8318af1fd3
commit
461ab6c7a1
1 changed files with 152 additions and 93 deletions
|
@ -1,19 +1,24 @@
|
||||||
import json
|
|
||||||
import random
|
import random
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
|
|
||||||
from channels.channel import Group
|
from asgiref.sync import async_to_sync, sync_to_async
|
||||||
from channels.test import ChannelTestCase, WSClient
|
from channels.auth import AuthMiddlewareStack
|
||||||
|
from channels.consumer import get_channel_layer
|
||||||
|
from channels.testing import WebsocketCommunicator
|
||||||
from django.contrib.auth.models import AnonymousUser, Permission, User
|
from django.contrib.auth.models import AnonymousUser, Permission, User
|
||||||
from django.test import Client
|
from django.test import Client, TestCase
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
|
|
||||||
from . import OpenKfet
|
from . import OpenKfet
|
||||||
from .consumers import OpenKfetConsumer
|
from .consumers import OpenKfetConsumer
|
||||||
|
|
||||||
|
|
||||||
class OpenKfetTest(ChannelTestCase):
|
def ws_communicator(cls, path: str, headers=[]):
|
||||||
|
return WebsocketCommunicator(AuthMiddlewareStack(cls.as_asgi()), path, headers)
|
||||||
|
|
||||||
|
|
||||||
|
class OpenKfetTest(TestCase):
|
||||||
"""OpenKfet object unit-tests suite."""
|
"""OpenKfet object unit-tests suite."""
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
@ -79,7 +84,7 @@ class OpenKfetTest(ChannelTestCase):
|
||||||
def test_export_user(self):
|
def test_export_user(self):
|
||||||
"""Export is limited for an anonymous user."""
|
"""Export is limited for an anonymous user."""
|
||||||
export = self.kfet_open.export(AnonymousUser())
|
export = self.kfet_open.export(AnonymousUser())
|
||||||
self.assertSetEqual(set(["status"]), set(export))
|
self.assertSetEqual(set(["status", "type"]), set(export))
|
||||||
|
|
||||||
def test_export_team(self):
|
def test_export_team(self):
|
||||||
"""Export all values for a team member."""
|
"""Export all values for a team member."""
|
||||||
|
@ -89,24 +94,32 @@ class OpenKfetTest(ChannelTestCase):
|
||||||
)
|
)
|
||||||
user.user_permissions.add(is_team)
|
user.user_permissions.add(is_team)
|
||||||
export = self.kfet_open.export(user)
|
export = self.kfet_open.export(user)
|
||||||
self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(export))
|
self.assertSetEqual(
|
||||||
|
set(["status", "admin_status", "force_close", "type"]), set(export)
|
||||||
|
)
|
||||||
|
|
||||||
def test_send_ws(self):
|
async def test_send_ws(self):
|
||||||
Group("kfet.open.base").add("test.open.base")
|
channel_layer = get_channel_layer()
|
||||||
Group("kfet.open.team").add("test.open.team")
|
base_channel = await channel_layer.new_channel()
|
||||||
|
team_channel = await channel_layer.new_channel()
|
||||||
|
|
||||||
self.kfet_open.send_ws()
|
await channel_layer.group_add("kfet.open.base", base_channel)
|
||||||
|
await channel_layer.group_add("kfet.open.team", team_channel)
|
||||||
|
|
||||||
recv_base = self.get_next_message("test.open.base", require=True)
|
await self.kfet_open.send_ws()
|
||||||
base = json.loads(recv_base["text"])
|
|
||||||
self.assertSetEqual(set(["status"]), set(base))
|
|
||||||
|
|
||||||
recv_admin = self.get_next_message("test.open.team", require=True)
|
base = await channel_layer.receive(base_channel)
|
||||||
admin = json.loads(recv_admin["text"])
|
|
||||||
self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(admin))
|
self.assertSetEqual(set(["status", "type"]), set(base))
|
||||||
|
|
||||||
|
team = await channel_layer.receive(team_channel)
|
||||||
|
|
||||||
|
self.assertSetEqual(
|
||||||
|
set(["status", "admin_status", "force_close", "type"]), set(team)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
class OpenKfetViewsTest(ChannelTestCase):
|
class OpenKfetViewsTest(TestCase):
|
||||||
"""OpenKfet views unit-tests suite."""
|
"""OpenKfet views unit-tests suite."""
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
@ -177,60 +190,82 @@ class OpenKfetViewsTest(ChannelTestCase):
|
||||||
self.assertEqual(403, resp.status_code)
|
self.assertEqual(403, resp.status_code)
|
||||||
|
|
||||||
|
|
||||||
class OpenKfetConsumerTest(ChannelTestCase):
|
class OpenKfetConsumerTest(TestCase):
|
||||||
"""OpenKfet consumer unit-tests suite."""
|
"""OpenKfet consumer unit-tests suite."""
|
||||||
|
|
||||||
def test_standard_user(self):
|
def setUp(self):
|
||||||
"""Lambda user is added to kfet.open.base group."""
|
|
||||||
# setup anonymous client
|
|
||||||
c = WSClient()
|
|
||||||
|
|
||||||
# connect
|
|
||||||
c.send_and_consume(
|
|
||||||
"websocket.connect", path="/ws/k-fet/open", fail_on_none=True
|
|
||||||
)
|
|
||||||
|
|
||||||
# initialization data is replied on connection
|
|
||||||
self.assertIsNotNone(c.receive())
|
|
||||||
|
|
||||||
# client belongs to the 'kfet.open' group...
|
|
||||||
OpenKfetConsumer.group_send("kfet.open.base", {"test": "plop"})
|
|
||||||
self.assertEqual(c.receive(), {"test": "plop"})
|
|
||||||
|
|
||||||
# ...but not to the 'kfet.open.admin' one
|
|
||||||
OpenKfetConsumer.group_send("kfet.open.team", {"test": "plop"})
|
|
||||||
self.assertIsNone(c.receive())
|
|
||||||
|
|
||||||
@mock.patch("gestioncof.signals.messages")
|
|
||||||
def test_team_user(self, mock_messages):
|
|
||||||
"""Team user is added to kfet.open.team group."""
|
|
||||||
# setup team user and its client
|
|
||||||
t = User.objects.create_user("team", "", "team")
|
t = User.objects.create_user("team", "", "team")
|
||||||
is_team = Permission.objects.get(
|
is_team = Permission.objects.get(
|
||||||
codename="is_team", content_type__app_label="kfet"
|
codename="is_team", content_type__app_label="kfet"
|
||||||
)
|
)
|
||||||
t.user_permissions.add(is_team)
|
t.user_permissions.add(is_team)
|
||||||
c = WSClient()
|
|
||||||
c.force_login(t, backend="django.contrib.auth.backends.ModelBackend")
|
|
||||||
|
|
||||||
# connect
|
self.team_user = t
|
||||||
c.send_and_consume(
|
|
||||||
"websocket.connect", path="/ws/k-fet/open", fail_on_none=True
|
@async_to_sync
|
||||||
)
|
async def test_standard_user(self):
|
||||||
|
"""Lambda user is added to kfet.open.base group."""
|
||||||
|
# setup anonymous client
|
||||||
|
c = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
|
||||||
|
|
||||||
|
connected, _ = await c.connect()
|
||||||
|
|
||||||
|
self.assertTrue(connected)
|
||||||
|
|
||||||
# initialization data is replied on connection
|
# initialization data is replied on connection
|
||||||
self.assertIsNotNone(c.receive())
|
message = await c.receive_json_from()
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
|
||||||
# client belongs to the 'kfet.open.admin' group...
|
# client belongs to the 'kfet.open' group...
|
||||||
OpenKfetConsumer.group_send("kfet.open.team", {"test": "plop"})
|
channel_layer = get_channel_layer()
|
||||||
self.assertEqual(c.receive(), {"test": "plop"})
|
|
||||||
|
await channel_layer.group_send(
|
||||||
|
"kfet.open.base", {"test": "plop", "type": "open.status"}
|
||||||
|
)
|
||||||
|
message = await c.receive_json_from()
|
||||||
|
|
||||||
|
self.assertEqual(message, {"test": "plop"})
|
||||||
|
|
||||||
|
# ...but not to the 'kfet.open.admin' one
|
||||||
|
await channel_layer.group_send(
|
||||||
|
"kfet.open.team", {"test": "plop", "type": "open.status"}
|
||||||
|
)
|
||||||
|
self.assertTrue(await c.receive_nothing())
|
||||||
|
|
||||||
|
async def test_team_user(self):
|
||||||
|
"""Team user is added to kfet.open.team group."""
|
||||||
|
|
||||||
|
with mock.patch("gestioncof.signals.messages"), mock.patch(
|
||||||
|
"kfet.open.consumers.kfet_is_team", return_value=True
|
||||||
|
):
|
||||||
|
c = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
|
||||||
|
|
||||||
|
connected, _ = await c.connect()
|
||||||
|
|
||||||
|
channel_layer = get_channel_layer()
|
||||||
|
|
||||||
|
self.assertTrue(connected)
|
||||||
|
|
||||||
|
# initialization data is replied on connection
|
||||||
|
message = await c.receive_json_from()
|
||||||
|
self.assertIsNotNone(message)
|
||||||
|
|
||||||
|
# client belongs to the 'kfet.open.team' group...
|
||||||
|
await channel_layer.group_send(
|
||||||
|
"kfet.open.team", {"test": "plop", "type": "open.status"}
|
||||||
|
)
|
||||||
|
message = await c.receive_json_from()
|
||||||
|
|
||||||
|
self.assertEqual(message, {"test": "plop"})
|
||||||
|
|
||||||
# ...but not to the 'kfet.open' one
|
# ...but not to the 'kfet.open' one
|
||||||
OpenKfetConsumer.group_send("kfet.open.base", {"test": "plop"})
|
await channel_layer.group_send(
|
||||||
self.assertIsNone(c.receive())
|
"kfet.open.base", {"test": "plop", "type": "open.status"}
|
||||||
|
)
|
||||||
|
self.assertTrue(await c.receive_nothing())
|
||||||
|
|
||||||
|
|
||||||
class OpenKfetScenarioTest(ChannelTestCase):
|
class OpenKfetScenarioTest(TestCase):
|
||||||
"""OpenKfet functionnal tests suite."""
|
"""OpenKfet functionnal tests suite."""
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
|
@ -241,91 +276,115 @@ class OpenKfetScenarioTest(ChannelTestCase):
|
||||||
|
|
||||||
# anonymous client (for views)
|
# anonymous client (for views)
|
||||||
self.c = Client()
|
self.c = Client()
|
||||||
# anonymous client (for websockets)
|
|
||||||
self.c_ws = WSClient()
|
|
||||||
|
|
||||||
# root user
|
# root user
|
||||||
self.r = User.objects.create_superuser("root", "", "root")
|
self.r = User.objects.create_superuser("root", "", "root")
|
||||||
# its client (for views)
|
|
||||||
|
# root client
|
||||||
self.r_c = Client()
|
self.r_c = Client()
|
||||||
self.r_c.login(username="root", password="root")
|
self.r_c.login(username="root", password="root")
|
||||||
# its client (for websockets)
|
|
||||||
self.r_c_ws = WSClient()
|
|
||||||
self.r_c_ws.force_login(
|
|
||||||
self.r, backend="django.contrib.auth.backends.ModelBackend"
|
|
||||||
)
|
|
||||||
|
|
||||||
self.kfet_open = OpenKfet(
|
self.kfet_open = OpenKfet(
|
||||||
cache_prefix="test_kfetopen_%s" % random.randrange(2**20)
|
cache_prefix="test_kfetopen_%s" % random.randrange(2**20)
|
||||||
)
|
)
|
||||||
self.addCleanup(self.kfet_open.clear_cache)
|
self.addCleanup(self.kfet_open.clear_cache)
|
||||||
|
|
||||||
def ws_connect(self, ws_client):
|
async def ws_connect(self, ws_client):
|
||||||
ws_client.send_and_consume(
|
c, _ = await ws_client.connect()
|
||||||
"websocket.connect", path="/ws/k-fet/open", fail_on_none=True
|
|
||||||
)
|
|
||||||
return ws_client.receive(json=True)
|
|
||||||
|
|
||||||
def test_scenario_0(self):
|
self.assertTrue(c)
|
||||||
|
|
||||||
|
return await ws_client.receive_json_from()
|
||||||
|
|
||||||
|
async def test_scenario_0(self):
|
||||||
"""Clients connect."""
|
"""Clients connect."""
|
||||||
|
# anonymous client (for websockets)
|
||||||
|
self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
|
||||||
|
|
||||||
# test for anonymous user
|
# test for anonymous user
|
||||||
msg = self.ws_connect(self.c_ws)
|
msg = await self.ws_connect(self.c_ws)
|
||||||
self.assertSetEqual(set(["status"]), set(msg))
|
self.assertSetEqual(set(["status"]), set(msg))
|
||||||
|
|
||||||
# test for root user
|
# test for root user
|
||||||
msg = self.ws_connect(self.r_c_ws)
|
with mock.patch(
|
||||||
self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(msg))
|
"kfet.open.consumers.kfet_is_team", return_value=True
|
||||||
|
), mock.patch("kfet.open.open.kfet_is_team", return_value=True):
|
||||||
|
self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
|
||||||
|
|
||||||
def test_scenario_1(self):
|
msg = await self.ws_connect(self.r_c_ws)
|
||||||
|
self.assertSetEqual(
|
||||||
|
set(["status", "admin_status", "force_close"]), set(msg)
|
||||||
|
)
|
||||||
|
|
||||||
|
async def test_scenario_1(self):
|
||||||
"""Clients connect, door opens, enable force close."""
|
"""Clients connect, door opens, enable force close."""
|
||||||
self.ws_connect(self.c_ws)
|
|
||||||
self.ws_connect(self.r_c_ws)
|
self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
|
||||||
|
await self.ws_connect(self.c_ws)
|
||||||
|
|
||||||
|
with mock.patch(
|
||||||
|
"kfet.open.consumers.kfet_is_team", return_value=True
|
||||||
|
), mock.patch("kfet.open.open.kfet_is_team", return_value=True):
|
||||||
|
self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
|
||||||
|
await self.ws_connect(self.r_c_ws)
|
||||||
|
|
||||||
# door sent "I'm open!"
|
# door sent "I'm open!"
|
||||||
self.c.post("/k-fet/open/raw_open", {"raw_open": True, "token": "plop"})
|
await sync_to_async(self.c.post)(
|
||||||
|
"/k-fet/open/raw_open", {"raw_open": True, "token": "plop"}
|
||||||
|
)
|
||||||
|
|
||||||
# anonymous user agree
|
# anonymous user agree
|
||||||
msg = self.c_ws.receive(json=True)
|
msg = await self.c_ws.receive_json_from()
|
||||||
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
||||||
|
|
||||||
# root user too
|
# root user too
|
||||||
msg = self.r_c_ws.receive(json=True)
|
msg = await self.r_c_ws.receive_json_from()
|
||||||
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
||||||
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
|
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
|
||||||
|
|
||||||
# admin says "no it's closed"
|
# admin says "no it's closed"
|
||||||
self.r_c.post("/k-fet/open/force_close", {"force_close": True})
|
await sync_to_async(self.r_c.post)(
|
||||||
|
"/k-fet/open/force_close", {"force_close": True}
|
||||||
|
)
|
||||||
|
|
||||||
# so anonymous user see it's closed
|
# so anonymous user see it's closed
|
||||||
msg = self.c_ws.receive(json=True)
|
msg = await self.c_ws.receive_json_from()
|
||||||
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
||||||
|
|
||||||
# root user too
|
# root user too
|
||||||
msg = self.r_c_ws.receive(json=True)
|
msg = await self.r_c_ws.receive_json_from()
|
||||||
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
||||||
# but root knows things
|
# but root knows things
|
||||||
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
|
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
|
||||||
self.assertTrue(msg["force_close"])
|
self.assertTrue(msg["force_close"])
|
||||||
|
|
||||||
def test_scenario_2(self):
|
async def test_scenario_2(self):
|
||||||
"""Starting falsely closed, clients connect, disable force close."""
|
"""Starting falsely closed, clients connect, disable force close."""
|
||||||
self.kfet_open.raw_open = True
|
self.kfet_open.raw_open = True
|
||||||
self.kfet_open.force_close = True
|
self.kfet_open.force_close = True
|
||||||
|
|
||||||
msg = self.ws_connect(self.c_ws)
|
self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
|
||||||
|
|
||||||
|
msg = await self.ws_connect(self.c_ws)
|
||||||
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
||||||
|
|
||||||
msg = self.ws_connect(self.r_c_ws)
|
with mock.patch(
|
||||||
|
"kfet.open.consumers.kfet_is_team", return_value=True
|
||||||
|
), mock.patch("kfet.open.open.kfet_is_team", return_value=True):
|
||||||
|
self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
|
||||||
|
msg = await self.ws_connect(self.r_c_ws)
|
||||||
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
||||||
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
|
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
|
||||||
self.assertTrue(msg["force_close"])
|
self.assertTrue(msg["force_close"])
|
||||||
|
|
||||||
self.r_c.post("/k-fet/open/force_close", {"force_close": False})
|
await sync_to_async(self.r_c.post)(
|
||||||
|
"/k-fet/open/force_close", {"force_close": False}
|
||||||
|
)
|
||||||
|
|
||||||
msg = self.c_ws.receive(json=True)
|
msg = await self.c_ws.receive_json_from()
|
||||||
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
||||||
|
|
||||||
msg = self.r_c_ws.receive(json=True)
|
msg = await self.r_c_ws.receive_json_from()
|
||||||
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
||||||
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
|
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
|
||||||
self.assertFalse(msg["force_close"])
|
self.assertFalse(msg["force_close"])
|
||||||
|
|
Loading…
Reference in a new issue