380 lines
14 KiB
Python
380 lines
14 KiB
Python
import random
|
|
from datetime import timedelta
|
|
from unittest import mock
|
|
|
|
from asgiref.sync import async_to_sync
|
|
from channels.auth import AuthMiddlewareStack
|
|
from channels.consumer import get_channel_layer
|
|
from channels.testing import WebsocketCommunicator
|
|
from django.contrib.auth.models import AnonymousUser, Permission, User
|
|
from django.test import Client, TestCase
|
|
from django.utils import timezone
|
|
|
|
from . import OpenKfet
|
|
from .consumers import OpenKfetConsumer
|
|
|
|
|
|
def ws_communicator(cls, path: str, headers=[]):
|
|
return WebsocketCommunicator(AuthMiddlewareStack(cls.as_asgi()), path, headers)
|
|
|
|
|
|
class OpenKfetTest(TestCase):
|
|
"""OpenKfet object unit-tests suite."""
|
|
|
|
def setUp(self):
|
|
self.kfet_open = OpenKfet(
|
|
cache_prefix="test_kfetopen_%s" % random.randrange(2**20)
|
|
)
|
|
self.addCleanup(self.kfet_open.clear_cache)
|
|
|
|
def test_defaults(self):
|
|
"""Default values."""
|
|
self.assertFalse(self.kfet_open.raw_open)
|
|
self.assertIsNone(self.kfet_open.last_update)
|
|
self.assertFalse(self.kfet_open.force_close)
|
|
self.assertFalse(self.kfet_open.is_open)
|
|
|
|
def test_raw_open(self):
|
|
"""Get and set raw_open; last_update is renewed."""
|
|
for raw_open in [True, False]:
|
|
prev_update = self.kfet_open.last_update
|
|
self.kfet_open.raw_open = raw_open
|
|
self.assertEqual(raw_open, self.kfet_open.raw_open)
|
|
self.assertNotEqual(prev_update, self.kfet_open.last_update)
|
|
|
|
def test_force_close(self):
|
|
"""Get and set force_close."""
|
|
for force_close in [True, False]:
|
|
self.kfet_open.force_close = force_close
|
|
self.assertEqual(force_close, self.kfet_open.force_close)
|
|
|
|
def test_is_open(self):
|
|
"""If force_close is disabled, is_open is raw_open."""
|
|
self.kfet_open.force_close = False
|
|
for raw_open in [True, False]:
|
|
self.kfet_open.raw_open = raw_open
|
|
self.assertEqual(raw_open, self.kfet_open.is_open)
|
|
|
|
def test_is_open_force_close(self):
|
|
"""If force_close is enabled, is_open is False."""
|
|
self.kfet_open.force_close = True
|
|
for raw_open in [True, False]:
|
|
self.kfet_open.raw_open = raw_open
|
|
self.assertFalse(self.kfet_open.is_open)
|
|
|
|
def test_status(self):
|
|
# (raw_open, force_close, expected status, expected admin)
|
|
cases = [
|
|
(False, False, OpenKfet.CLOSED, OpenKfet.CLOSED),
|
|
(False, True, OpenKfet.CLOSED, OpenKfet.CLOSED),
|
|
(True, False, OpenKfet.OPENED, OpenKfet.OPENED),
|
|
(True, True, OpenKfet.CLOSED, OpenKfet.FAKE_CLOSED),
|
|
]
|
|
for raw_open, force_close, exp_stat, exp_adm_stat in cases:
|
|
self.kfet_open.raw_open = raw_open
|
|
self.kfet_open.force_close = force_close
|
|
self.assertEqual(exp_stat, self.kfet_open.status())
|
|
self.assertEqual(exp_adm_stat, self.kfet_open.admin_status())
|
|
|
|
def test_status_unknown(self):
|
|
self.kfet_open.raw_open = True
|
|
self.kfet_open._last_update = timezone.now() - timedelta(days=30)
|
|
self.assertEqual(OpenKfet.UNKNOWN, self.kfet_open.status())
|
|
|
|
def test_export_user(self):
|
|
"""Export is limited for an anonymous user."""
|
|
export = self.kfet_open.export(AnonymousUser())
|
|
self.assertSetEqual(set(["status", "type"]), set(export))
|
|
|
|
def test_export_team(self):
|
|
"""Export all values for a team member."""
|
|
user = User.objects.create_user("team", "", "team")
|
|
is_team = Permission.objects.get(
|
|
codename="is_team", content_type__app_label="kfet"
|
|
)
|
|
user.user_permissions.add(is_team)
|
|
export = self.kfet_open.export(user)
|
|
self.assertSetEqual(
|
|
set(["status", "admin_status", "force_close", "type"]), set(export)
|
|
)
|
|
|
|
async def test_send_ws(self):
|
|
channel_layer = get_channel_layer()
|
|
base_channel = await channel_layer.new_channel()
|
|
team_channel = await channel_layer.new_channel()
|
|
|
|
await channel_layer.group_add("kfet.open.base", base_channel)
|
|
await channel_layer.group_add("kfet.open.team", team_channel)
|
|
|
|
await self.kfet_open.send_ws()
|
|
|
|
base = await channel_layer.receive(base_channel)
|
|
|
|
self.assertSetEqual(set(["status", "type"]), set(base))
|
|
|
|
team = await channel_layer.receive(team_channel)
|
|
|
|
self.assertSetEqual(
|
|
set(["status", "admin_status", "force_close", "type"]), set(team)
|
|
)
|
|
|
|
|
|
class OpenKfetViewsTest(TestCase):
|
|
"""OpenKfet views unit-tests suite."""
|
|
|
|
def setUp(self):
|
|
# Need this (and here) because of '<client>.login' in setUp
|
|
patcher_messages = mock.patch("gestioncof.signals.messages")
|
|
patcher_messages.start()
|
|
self.addCleanup(patcher_messages.stop)
|
|
|
|
# get some permissions
|
|
perms = {
|
|
"kfet.is_team": Permission.objects.get(
|
|
codename="is_team", content_type__app_label="kfet"
|
|
),
|
|
"kfet.can_force_close": Permission.objects.get(
|
|
codename="can_force_close", content_type__app_label="kfet"
|
|
),
|
|
}
|
|
|
|
# authenticated user and its client
|
|
self.u = User.objects.create_user("user", "", "user")
|
|
self.c = Client()
|
|
self.c.login(username="user", password="user")
|
|
|
|
# team user and its clients
|
|
self.t = User.objects.create_user("team", "", "team")
|
|
self.t.user_permissions.add(perms["kfet.is_team"])
|
|
self.c_t = Client()
|
|
self.c_t.login(username="team", password="team")
|
|
|
|
# admin user and its client
|
|
self.a = User.objects.create_user("admin", "", "admin")
|
|
self.a.user_permissions.add(
|
|
perms["kfet.is_team"], perms["kfet.can_force_close"]
|
|
)
|
|
self.c_a = Client()
|
|
self.c_a.login(username="admin", password="admin")
|
|
|
|
self.kfet_open = OpenKfet(
|
|
cache_prefix="test_kfetopen_%s" % random.randrange(2**20)
|
|
)
|
|
self.addCleanup(self.kfet_open.clear_cache)
|
|
|
|
views_patcher = mock.patch("kfet.open.views.kfet_open", self.kfet_open)
|
|
views_patcher.start()
|
|
self.addCleanup(views_patcher.stop)
|
|
|
|
def test_door(self):
|
|
"""Edit raw_status."""
|
|
for sent, expected in [(1, True), (0, False)]:
|
|
resp = Client().post(
|
|
"/k-fet/open/raw_open", {"raw_open": sent, "token": "plop"}
|
|
)
|
|
self.assertEqual(200, resp.status_code)
|
|
self.assertEqual(expected, self.kfet_open.raw_open)
|
|
|
|
def test_force_close(self):
|
|
"""Edit force_close."""
|
|
for sent, expected in [(1, True), (0, False)]:
|
|
resp = self.c_a.post("/k-fet/open/force_close", {"force_close": sent})
|
|
self.assertEqual(200, resp.status_code)
|
|
self.assertEqual(expected, self.kfet_open.force_close)
|
|
|
|
def test_force_close_forbidden(self):
|
|
"""Can't edit force_close without kfet.can_force_close permission."""
|
|
clients = [Client(), self.c, self.c_t]
|
|
for client in clients:
|
|
resp = client.post("/k-fet/open/force_close", {"force_close": 0})
|
|
self.assertEqual(403, resp.status_code)
|
|
|
|
|
|
class OpenKfetConsumerTest(TestCase):
|
|
"""OpenKfet consumer unit-tests suite."""
|
|
|
|
@classmethod
|
|
def setUpTestData(cls):
|
|
t = User.objects.create_user("team", "", "team")
|
|
is_team = Permission.objects.get(
|
|
codename="is_team", content_type__app_label="kfet"
|
|
)
|
|
t.user_permissions.add(is_team)
|
|
|
|
cls.team_user = t
|
|
|
|
async def test_standard_user(self):
|
|
"""Lambda user is added to kfet.open.base group."""
|
|
# setup anonymous client
|
|
c = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
|
|
|
|
connected, _ = await c.connect()
|
|
|
|
self.assertTrue(connected)
|
|
|
|
# initialization data is replied on connection
|
|
message = await c.receive_json_from()
|
|
self.assertIsNotNone(message)
|
|
|
|
# client belongs to the 'kfet.open' group...
|
|
channel_layer = get_channel_layer()
|
|
|
|
await channel_layer.group_send(
|
|
"kfet.open.base", {"test": "plop", "type": "open.status"}
|
|
)
|
|
message = await c.receive_json_from()
|
|
|
|
self.assertEqual(message, {"test": "plop"})
|
|
|
|
# ...but not to the 'kfet.open.admin' one
|
|
await channel_layer.group_send(
|
|
"kfet.open.team", {"test": "plop", "type": "open.status"}
|
|
)
|
|
self.assertTrue(await c.receive_nothing())
|
|
|
|
async def test_team_user(self):
|
|
"""Team user is added to kfet.open.team group."""
|
|
|
|
with mock.patch("gestioncof.signals.messages"), mock.patch(
|
|
"kfet.open.consumers.kfet_is_team", return_value=True
|
|
):
|
|
c = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
|
|
|
|
connected, _ = await c.connect()
|
|
|
|
channel_layer = get_channel_layer()
|
|
|
|
self.assertTrue(connected)
|
|
|
|
# initialization data is replied on connection
|
|
message = await c.receive_json_from()
|
|
self.assertIsNotNone(message)
|
|
|
|
# client belongs to the 'kfet.open.team' group...
|
|
await channel_layer.group_send(
|
|
"kfet.open.team", {"test": "plop", "type": "open.status"}
|
|
)
|
|
message = await c.receive_json_from()
|
|
|
|
self.assertEqual(message, {"test": "plop"})
|
|
|
|
# ...but not to the 'kfet.open' one
|
|
await channel_layer.group_send(
|
|
"kfet.open.base", {"test": "plop", "type": "open.status"}
|
|
)
|
|
self.assertTrue(await c.receive_nothing())
|
|
|
|
|
|
class OpenKfetScenarioTest(TestCase):
|
|
"""OpenKfet functionnal tests suite."""
|
|
|
|
@classmethod
|
|
def setUpTestData(cls):
|
|
# root user
|
|
cls.r = User.objects.create_superuser("team", "", "team")
|
|
|
|
# anonymous client (for views)
|
|
cls.c = Client()
|
|
|
|
# root client
|
|
cls.r_c = Client()
|
|
|
|
with mock.patch("gestioncof.signals.messages"):
|
|
cls.r_c.login(username="team", password="team")
|
|
|
|
def setUp(self):
|
|
# Create channels to listen to messages
|
|
channel_layer = get_channel_layer()
|
|
|
|
self.channel = async_to_sync(channel_layer.new_channel)()
|
|
self.team_channel = async_to_sync(channel_layer.new_channel)()
|
|
|
|
async_to_sync(channel_layer.group_add)("kfet.open.base", self.channel)
|
|
async_to_sync(channel_layer.group_add)("kfet.open.team", self.team_channel)
|
|
|
|
self.receive_msg = lambda c: async_to_sync(channel_layer.receive)(c)
|
|
|
|
self.kfet_open = OpenKfet(
|
|
cache_prefix="test_kfetopen_%s" % random.randrange(2**20)
|
|
)
|
|
self.addCleanup(self.kfet_open.clear_cache)
|
|
|
|
async def ws_connect(self, ws_communicator):
|
|
c, _ = await ws_communicator.connect()
|
|
|
|
self.assertTrue(c)
|
|
return await ws_communicator.receive_json_from()
|
|
|
|
def test_scenario_1(self):
|
|
"""Clients connect, door opens, enable force close."""
|
|
|
|
# door sent "I'm open!"
|
|
self.c.post("/k-fet/open/raw_open", {"raw_open": True, "token": "plop"})
|
|
|
|
# anonymous user agree
|
|
msg = self.receive_msg(self.channel)
|
|
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
|
|
|
# root user too
|
|
msg = self.receive_msg(self.team_channel)
|
|
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
|
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
|
|
|
|
# admin says "no it's closed"
|
|
self.r_c.post("/k-fet/open/force_close", {"force_close": True})
|
|
|
|
# so anonymous user see it's closed
|
|
msg = self.receive_msg(self.channel)
|
|
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
|
|
|
# root user too
|
|
msg = self.receive_msg(self.team_channel)
|
|
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
|
# but root knows things
|
|
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
|
|
self.assertTrue(msg["force_close"])
|
|
|
|
def test_scenario_2(self):
|
|
"""Starting falsely closed, clients connect, disable force close."""
|
|
self.kfet_open.raw_open = True
|
|
self.kfet_open.force_close = True
|
|
|
|
async_to_sync(OpenKfet().send_ws)()
|
|
|
|
msg = self.receive_msg(self.channel)
|
|
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
|
|
|
msg = self.receive_msg(self.team_channel)
|
|
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
|
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
|
|
self.assertTrue(msg["force_close"])
|
|
|
|
self.r_c.post("/k-fet/open/force_close", {"force_close": False})
|
|
|
|
msg = self.receive_msg(self.channel)
|
|
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
|
|
|
msg = self.receive_msg(self.team_channel)
|
|
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
|
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
|
|
self.assertFalse(msg["force_close"])
|
|
|
|
async def test_scenario_3(self):
|
|
"""Clients connect."""
|
|
# anonymous client (for websockets)
|
|
self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
|
|
|
|
# test for anonymous user
|
|
msg = await self.ws_connect(self.c_ws)
|
|
self.assertSetEqual(set(["status"]), set(msg))
|
|
|
|
# test for root user
|
|
with mock.patch(
|
|
"kfet.open.consumers.kfet_is_team", return_value=True
|
|
), mock.patch("kfet.open.open.kfet_is_team", return_value=True):
|
|
self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open")
|
|
|
|
msg = await self.ws_connect(self.r_c_ws)
|
|
self.assertSetEqual(
|
|
set(["status", "admin_status", "force_close"]), set(msg)
|
|
)
|