import random from datetime import timedelta from unittest import mock from asgiref.sync import async_to_sync, sync_to_async from channels.auth import AuthMiddlewareStack from channels.consumer import get_channel_layer from channels.testing import WebsocketCommunicator from django.contrib.auth.models import AnonymousUser, Permission, User from django.test import Client, TestCase from django.utils import timezone from . import OpenKfet from .consumers import OpenKfetConsumer def ws_communicator(cls, path: str, headers=[]): return WebsocketCommunicator(AuthMiddlewareStack(cls.as_asgi()), path, headers) class OpenKfetTest(TestCase): """OpenKfet object unit-tests suite.""" def setUp(self): self.kfet_open = OpenKfet( cache_prefix="test_kfetopen_%s" % random.randrange(2**20) ) self.addCleanup(self.kfet_open.clear_cache) def test_defaults(self): """Default values.""" self.assertFalse(self.kfet_open.raw_open) self.assertIsNone(self.kfet_open.last_update) self.assertFalse(self.kfet_open.force_close) self.assertFalse(self.kfet_open.is_open) def test_raw_open(self): """Get and set raw_open; last_update is renewed.""" for raw_open in [True, False]: prev_update = self.kfet_open.last_update self.kfet_open.raw_open = raw_open self.assertEqual(raw_open, self.kfet_open.raw_open) self.assertNotEqual(prev_update, self.kfet_open.last_update) def test_force_close(self): """Get and set force_close.""" for force_close in [True, False]: self.kfet_open.force_close = force_close self.assertEqual(force_close, self.kfet_open.force_close) def test_is_open(self): """If force_close is disabled, is_open is raw_open.""" self.kfet_open.force_close = False for raw_open in [True, False]: self.kfet_open.raw_open = raw_open self.assertEqual(raw_open, self.kfet_open.is_open) def test_is_open_force_close(self): """If force_close is enabled, is_open is False.""" self.kfet_open.force_close = True for raw_open in [True, False]: self.kfet_open.raw_open = raw_open self.assertFalse(self.kfet_open.is_open) def test_status(self): # (raw_open, force_close, expected status, expected admin) cases = [ (False, False, OpenKfet.CLOSED, OpenKfet.CLOSED), (False, True, OpenKfet.CLOSED, OpenKfet.CLOSED), (True, False, OpenKfet.OPENED, OpenKfet.OPENED), (True, True, OpenKfet.CLOSED, OpenKfet.FAKE_CLOSED), ] for raw_open, force_close, exp_stat, exp_adm_stat in cases: self.kfet_open.raw_open = raw_open self.kfet_open.force_close = force_close self.assertEqual(exp_stat, self.kfet_open.status()) self.assertEqual(exp_adm_stat, self.kfet_open.admin_status()) def test_status_unknown(self): self.kfet_open.raw_open = True self.kfet_open._last_update = timezone.now() - timedelta(days=30) self.assertEqual(OpenKfet.UNKNOWN, self.kfet_open.status()) def test_export_user(self): """Export is limited for an anonymous user.""" export = self.kfet_open.export(AnonymousUser()) self.assertSetEqual(set(["status", "type"]), set(export)) def test_export_team(self): """Export all values for a team member.""" user = User.objects.create_user("team", "", "team") is_team = Permission.objects.get( codename="is_team", content_type__app_label="kfet" ) user.user_permissions.add(is_team) export = self.kfet_open.export(user) self.assertSetEqual( set(["status", "admin_status", "force_close", "type"]), set(export) ) async def test_send_ws(self): channel_layer = get_channel_layer() base_channel = await channel_layer.new_channel() team_channel = await channel_layer.new_channel() await channel_layer.group_add("kfet.open.base", base_channel) await channel_layer.group_add("kfet.open.team", team_channel) await self.kfet_open.send_ws() base = await channel_layer.receive(base_channel) self.assertSetEqual(set(["status", "type"]), set(base)) team = await channel_layer.receive(team_channel) self.assertSetEqual( set(["status", "admin_status", "force_close", "type"]), set(team) ) class OpenKfetViewsTest(TestCase): """OpenKfet views unit-tests suite.""" def setUp(self): # Need this (and here) because of '.login' in setUp patcher_messages = mock.patch("gestioncof.signals.messages") patcher_messages.start() self.addCleanup(patcher_messages.stop) # get some permissions perms = { "kfet.is_team": Permission.objects.get( codename="is_team", content_type__app_label="kfet" ), "kfet.can_force_close": Permission.objects.get( codename="can_force_close", content_type__app_label="kfet" ), } # authenticated user and its client self.u = User.objects.create_user("user", "", "user") self.c = Client() self.c.login(username="user", password="user") # team user and its clients self.t = User.objects.create_user("team", "", "team") self.t.user_permissions.add(perms["kfet.is_team"]) self.c_t = Client() self.c_t.login(username="team", password="team") # admin user and its client self.a = User.objects.create_user("admin", "", "admin") self.a.user_permissions.add( perms["kfet.is_team"], perms["kfet.can_force_close"] ) self.c_a = Client() self.c_a.login(username="admin", password="admin") self.kfet_open = OpenKfet( cache_prefix="test_kfetopen_%s" % random.randrange(2**20) ) self.addCleanup(self.kfet_open.clear_cache) views_patcher = mock.patch("kfet.open.views.kfet_open", self.kfet_open) views_patcher.start() self.addCleanup(views_patcher.stop) def test_door(self): """Edit raw_status.""" for sent, expected in [(1, True), (0, False)]: resp = Client().post( "/k-fet/open/raw_open", {"raw_open": sent, "token": "plop"} ) self.assertEqual(200, resp.status_code) self.assertEqual(expected, self.kfet_open.raw_open) def test_force_close(self): """Edit force_close.""" for sent, expected in [(1, True), (0, False)]: resp = self.c_a.post("/k-fet/open/force_close", {"force_close": sent}) self.assertEqual(200, resp.status_code) self.assertEqual(expected, self.kfet_open.force_close) def test_force_close_forbidden(self): """Can't edit force_close without kfet.can_force_close permission.""" clients = [Client(), self.c, self.c_t] for client in clients: resp = client.post("/k-fet/open/force_close", {"force_close": 0}) self.assertEqual(403, resp.status_code) class OpenKfetConsumerTest(TestCase): """OpenKfet consumer unit-tests suite.""" def setUp(self): t = User.objects.create_user("team", "", "team") is_team = Permission.objects.get( codename="is_team", content_type__app_label="kfet" ) t.user_permissions.add(is_team) self.team_user = t @async_to_sync async def test_standard_user(self): """Lambda user is added to kfet.open.base group.""" # setup anonymous client c = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") connected, _ = await c.connect() self.assertTrue(connected) # initialization data is replied on connection message = await c.receive_json_from() self.assertIsNotNone(message) # client belongs to the 'kfet.open' group... channel_layer = get_channel_layer() await channel_layer.group_send( "kfet.open.base", {"test": "plop", "type": "open.status"} ) message = await c.receive_json_from() self.assertEqual(message, {"test": "plop"}) # ...but not to the 'kfet.open.admin' one await channel_layer.group_send( "kfet.open.team", {"test": "plop", "type": "open.status"} ) self.assertTrue(await c.receive_nothing()) async def test_team_user(self): """Team user is added to kfet.open.team group.""" with mock.patch("gestioncof.signals.messages"), mock.patch( "kfet.open.consumers.kfet_is_team", return_value=True ): c = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") connected, _ = await c.connect() channel_layer = get_channel_layer() self.assertTrue(connected) # initialization data is replied on connection message = await c.receive_json_from() self.assertIsNotNone(message) # client belongs to the 'kfet.open.team' group... await channel_layer.group_send( "kfet.open.team", {"test": "plop", "type": "open.status"} ) message = await c.receive_json_from() self.assertEqual(message, {"test": "plop"}) # ...but not to the 'kfet.open' one await channel_layer.group_send( "kfet.open.base", {"test": "plop", "type": "open.status"} ) self.assertTrue(await c.receive_nothing()) class OpenKfetScenarioTest(TestCase): """OpenKfet functionnal tests suite.""" def setUp(self): # Need this (and here) because of '.login' in setUp patcher_messages = mock.patch("gestioncof.signals.messages") patcher_messages.start() self.addCleanup(patcher_messages.stop) # anonymous client (for views) self.c = Client() # root user self.r = User.objects.create_superuser("root", "", "root") # root client self.r_c = Client() self.r_c.login(username="root", password="root") self.kfet_open = OpenKfet( cache_prefix="test_kfetopen_%s" % random.randrange(2**20) ) self.addCleanup(self.kfet_open.clear_cache) async def ws_connect(self, ws_client): c, _ = await ws_client.connect() self.assertTrue(c) return await ws_client.receive_json_from() async def test_scenario_0(self): """Clients connect.""" # anonymous client (for websockets) self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") # test for anonymous user msg = await self.ws_connect(self.c_ws) self.assertSetEqual(set(["status"]), set(msg)) # test for root user with mock.patch( "kfet.open.consumers.kfet_is_team", return_value=True ), mock.patch("kfet.open.open.kfet_is_team", return_value=True): self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") msg = await self.ws_connect(self.r_c_ws) self.assertSetEqual( set(["status", "admin_status", "force_close"]), set(msg) ) async def test_scenario_1(self): """Clients connect, door opens, enable force close.""" self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") await self.ws_connect(self.c_ws) with mock.patch( "kfet.open.consumers.kfet_is_team", return_value=True ), mock.patch("kfet.open.open.kfet_is_team", return_value=True): self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") await self.ws_connect(self.r_c_ws) # door sent "I'm open!" await sync_to_async(self.c.post)( "/k-fet/open/raw_open", {"raw_open": True, "token": "plop"} ) # anonymous user agree msg = await self.c_ws.receive_json_from() self.assertEqual(OpenKfet.OPENED, msg["status"]) # root user too msg = await self.r_c_ws.receive_json_from() self.assertEqual(OpenKfet.OPENED, msg["status"]) self.assertEqual(OpenKfet.OPENED, msg["admin_status"]) # admin says "no it's closed" await sync_to_async(self.r_c.post)( "/k-fet/open/force_close", {"force_close": True} ) # so anonymous user see it's closed msg = await self.c_ws.receive_json_from() self.assertEqual(OpenKfet.CLOSED, msg["status"]) # root user too msg = await self.r_c_ws.receive_json_from() self.assertEqual(OpenKfet.CLOSED, msg["status"]) # but root knows things self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"]) self.assertTrue(msg["force_close"]) async def test_scenario_2(self): """Starting falsely closed, clients connect, disable force close.""" self.kfet_open.raw_open = True self.kfet_open.force_close = True self.c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") msg = await self.ws_connect(self.c_ws) self.assertEqual(OpenKfet.CLOSED, msg["status"]) with mock.patch( "kfet.open.consumers.kfet_is_team", return_value=True ), mock.patch("kfet.open.open.kfet_is_team", return_value=True): self.r_c_ws = ws_communicator(OpenKfetConsumer, "/ws/k-fet/open") msg = await self.ws_connect(self.r_c_ws) self.assertEqual(OpenKfet.CLOSED, msg["status"]) self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"]) self.assertTrue(msg["force_close"]) await sync_to_async(self.r_c.post)( "/k-fet/open/force_close", {"force_close": False} ) msg = await self.c_ws.receive_json_from() self.assertEqual(OpenKfet.OPENED, msg["status"]) msg = await self.r_c_ws.receive_json_from() self.assertEqual(OpenKfet.OPENED, msg["status"]) self.assertEqual(OpenKfet.OPENED, msg["admin_status"]) self.assertFalse(msg["force_close"])