forked from DGNum/gestioCOF
64c792b11f
In some places we used to refer to permissions based on their codename only (the part after the dot "." in the following examples) which can be ambiguous. Typically, we might define permissions like "bds.is_team" or "cof.is_team" in the near future ;)
325 lines
12 KiB
Python
325 lines
12 KiB
Python
import json
|
|
import random
|
|
from datetime import timedelta
|
|
from unittest import mock
|
|
|
|
from channels.channel import Group
|
|
from channels.test import ChannelTestCase, WSClient
|
|
from django.contrib.auth.models import AnonymousUser, Permission, User
|
|
from django.test import Client
|
|
from django.utils import timezone
|
|
|
|
from . import OpenKfet
|
|
from .consumers import OpenKfetConsumer
|
|
|
|
|
|
class OpenKfetTest(ChannelTestCase):
|
|
"""OpenKfet object unit-tests suite."""
|
|
|
|
def setUp(self):
|
|
self.kfet_open = OpenKfet(
|
|
cache_prefix="test_kfetopen_%s" % random.randrange(2 ** 20)
|
|
)
|
|
self.addCleanup(self.kfet_open.clear_cache)
|
|
|
|
def test_defaults(self):
|
|
"""Default values."""
|
|
self.assertFalse(self.kfet_open.raw_open)
|
|
self.assertIsNone(self.kfet_open.last_update)
|
|
self.assertFalse(self.kfet_open.force_close)
|
|
self.assertFalse(self.kfet_open.is_open)
|
|
|
|
def test_raw_open(self):
|
|
"""Get and set raw_open; last_update is renewed."""
|
|
for raw_open in [True, False]:
|
|
prev_update = self.kfet_open.last_update
|
|
self.kfet_open.raw_open = raw_open
|
|
self.assertEqual(raw_open, self.kfet_open.raw_open)
|
|
self.assertNotEqual(prev_update, self.kfet_open.last_update)
|
|
|
|
def test_force_close(self):
|
|
"""Get and set force_close."""
|
|
for force_close in [True, False]:
|
|
self.kfet_open.force_close = force_close
|
|
self.assertEqual(force_close, self.kfet_open.force_close)
|
|
|
|
def test_is_open(self):
|
|
"""If force_close is disabled, is_open is raw_open."""
|
|
self.kfet_open.force_close = False
|
|
for raw_open in [True, False]:
|
|
self.kfet_open.raw_open = raw_open
|
|
self.assertEqual(raw_open, self.kfet_open.is_open)
|
|
|
|
def test_is_open_force_close(self):
|
|
"""If force_close is enabled, is_open is False."""
|
|
self.kfet_open.force_close = True
|
|
for raw_open in [True, False]:
|
|
self.kfet_open.raw_open = raw_open
|
|
self.assertFalse(self.kfet_open.is_open)
|
|
|
|
def test_status(self):
|
|
# (raw_open, force_close, expected status, expected admin)
|
|
cases = [
|
|
(False, False, OpenKfet.CLOSED, OpenKfet.CLOSED),
|
|
(False, True, OpenKfet.CLOSED, OpenKfet.CLOSED),
|
|
(True, False, OpenKfet.OPENED, OpenKfet.OPENED),
|
|
(True, True, OpenKfet.CLOSED, OpenKfet.FAKE_CLOSED),
|
|
]
|
|
for raw_open, force_close, exp_stat, exp_adm_stat in cases:
|
|
self.kfet_open.raw_open = raw_open
|
|
self.kfet_open.force_close = force_close
|
|
self.assertEqual(exp_stat, self.kfet_open.status())
|
|
self.assertEqual(exp_adm_stat, self.kfet_open.admin_status())
|
|
|
|
def test_status_unknown(self):
|
|
self.kfet_open.raw_open = True
|
|
self.kfet_open._last_update = timezone.now() - timedelta(days=30)
|
|
self.assertEqual(OpenKfet.UNKNOWN, self.kfet_open.status())
|
|
|
|
def test_export_user(self):
|
|
"""Export is limited for an anonymous user."""
|
|
export = self.kfet_open.export(AnonymousUser())
|
|
self.assertSetEqual(set(["status"]), set(export))
|
|
|
|
def test_export_team(self):
|
|
"""Export all values for a team member."""
|
|
user = User.objects.create_user("team", "", "team")
|
|
is_team = Permission.objects.get_by_natural_key("is_team", "kfet", "account")
|
|
user.user_permissions.add(is_team)
|
|
export = self.kfet_open.export(user)
|
|
self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(export))
|
|
|
|
def test_send_ws(self):
|
|
Group("kfet.open.base").add("test.open.base")
|
|
Group("kfet.open.team").add("test.open.team")
|
|
|
|
self.kfet_open.send_ws()
|
|
|
|
recv_base = self.get_next_message("test.open.base", require=True)
|
|
base = json.loads(recv_base["text"])
|
|
self.assertSetEqual(set(["status"]), set(base))
|
|
|
|
recv_admin = self.get_next_message("test.open.team", require=True)
|
|
admin = json.loads(recv_admin["text"])
|
|
self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(admin))
|
|
|
|
|
|
class OpenKfetViewsTest(ChannelTestCase):
|
|
"""OpenKfet views unit-tests suite."""
|
|
|
|
def setUp(self):
|
|
# Need this (and here) because of '<client>.login' in setUp
|
|
patcher_messages = mock.patch("gestioncof.signals.messages")
|
|
patcher_messages.start()
|
|
self.addCleanup(patcher_messages.stop)
|
|
|
|
# get some permissions
|
|
perms = {
|
|
"kfet.is_team": Permission.objects.get_by_natural_key(
|
|
"is_team", "kfet", "account"
|
|
),
|
|
"kfet.can_force_close": Permission.objects.get_by_natural_key(
|
|
"can_force_close", "kfet", "account"
|
|
),
|
|
}
|
|
|
|
# authenticated user and its client
|
|
self.u = User.objects.create_user("user", "", "user")
|
|
self.c = Client()
|
|
self.c.login(username="user", password="user")
|
|
|
|
# team user and its clients
|
|
self.t = User.objects.create_user("team", "", "team")
|
|
self.t.user_permissions.add(perms["kfet.is_team"])
|
|
self.c_t = Client()
|
|
self.c_t.login(username="team", password="team")
|
|
|
|
# admin user and its client
|
|
self.a = User.objects.create_user("admin", "", "admin")
|
|
self.a.user_permissions.add(
|
|
perms["kfet.is_team"], perms["kfet.can_force_close"]
|
|
)
|
|
self.c_a = Client()
|
|
self.c_a.login(username="admin", password="admin")
|
|
|
|
self.kfet_open = OpenKfet(
|
|
cache_prefix="test_kfetopen_%s" % random.randrange(2 ** 20)
|
|
)
|
|
self.addCleanup(self.kfet_open.clear_cache)
|
|
|
|
views_patcher = mock.patch("kfet.open.views.kfet_open", self.kfet_open)
|
|
views_patcher.start()
|
|
self.addCleanup(views_patcher.stop)
|
|
|
|
def test_door(self):
|
|
"""Edit raw_status."""
|
|
for sent, expected in [(1, True), (0, False)]:
|
|
resp = Client().post(
|
|
"/k-fet/open/raw_open", {"raw_open": sent, "token": "plop"}
|
|
)
|
|
self.assertEqual(200, resp.status_code)
|
|
self.assertEqual(expected, self.kfet_open.raw_open)
|
|
|
|
def test_force_close(self):
|
|
"""Edit force_close."""
|
|
for sent, expected in [(1, True), (0, False)]:
|
|
resp = self.c_a.post("/k-fet/open/force_close", {"force_close": sent})
|
|
self.assertEqual(200, resp.status_code)
|
|
self.assertEqual(expected, self.kfet_open.force_close)
|
|
|
|
def test_force_close_forbidden(self):
|
|
"""Can't edit force_close without kfet.can_force_close permission."""
|
|
clients = [Client(), self.c, self.c_t]
|
|
for client in clients:
|
|
resp = client.post("/k-fet/open/force_close", {"force_close": 0})
|
|
self.assertEqual(403, resp.status_code)
|
|
|
|
|
|
class OpenKfetConsumerTest(ChannelTestCase):
|
|
"""OpenKfet consumer unit-tests suite."""
|
|
|
|
def test_standard_user(self):
|
|
"""Lambda user is added to kfet.open.base group."""
|
|
# setup anonymous client
|
|
c = WSClient()
|
|
|
|
# connect
|
|
c.send_and_consume(
|
|
"websocket.connect", path="/ws/k-fet/open", fail_on_none=True
|
|
)
|
|
|
|
# initialization data is replied on connection
|
|
self.assertIsNotNone(c.receive())
|
|
|
|
# client belongs to the 'kfet.open' group...
|
|
OpenKfetConsumer.group_send("kfet.open.base", {"test": "plop"})
|
|
self.assertEqual(c.receive(), {"test": "plop"})
|
|
|
|
# ...but not to the 'kfet.open.admin' one
|
|
OpenKfetConsumer.group_send("kfet.open.team", {"test": "plop"})
|
|
self.assertIsNone(c.receive())
|
|
|
|
@mock.patch("gestioncof.signals.messages")
|
|
def test_team_user(self, mock_messages):
|
|
"""Team user is added to kfet.open.team group."""
|
|
# setup team user and its client
|
|
t = User.objects.create_user("team", "", "team")
|
|
is_team = Permission.objects.get_by_natural_key("is_team", "kfet", "account")
|
|
t.user_permissions.add(is_team)
|
|
c = WSClient()
|
|
c.force_login(t)
|
|
|
|
# connect
|
|
c.send_and_consume(
|
|
"websocket.connect", path="/ws/k-fet/open", fail_on_none=True
|
|
)
|
|
|
|
# initialization data is replied on connection
|
|
self.assertIsNotNone(c.receive())
|
|
|
|
# client belongs to the 'kfet.open.admin' group...
|
|
OpenKfetConsumer.group_send("kfet.open.team", {"test": "plop"})
|
|
self.assertEqual(c.receive(), {"test": "plop"})
|
|
|
|
# ... but not to the 'kfet.open' one
|
|
OpenKfetConsumer.group_send("kfet.open.base", {"test": "plop"})
|
|
self.assertIsNone(c.receive())
|
|
|
|
|
|
class OpenKfetScenarioTest(ChannelTestCase):
|
|
"""OpenKfet functionnal tests suite."""
|
|
|
|
def setUp(self):
|
|
# Need this (and here) because of '<client>.login' in setUp
|
|
patcher_messages = mock.patch("gestioncof.signals.messages")
|
|
patcher_messages.start()
|
|
self.addCleanup(patcher_messages.stop)
|
|
|
|
# anonymous client (for views)
|
|
self.c = Client()
|
|
# anonymous client (for websockets)
|
|
self.c_ws = WSClient()
|
|
|
|
# root user
|
|
self.r = User.objects.create_superuser("root", "", "root")
|
|
# its client (for views)
|
|
self.r_c = Client()
|
|
self.r_c.login(username="root", password="root")
|
|
# its client (for websockets)
|
|
self.r_c_ws = WSClient()
|
|
self.r_c_ws.force_login(self.r)
|
|
|
|
self.kfet_open = OpenKfet(
|
|
cache_prefix="test_kfetopen_%s" % random.randrange(2 ** 20)
|
|
)
|
|
self.addCleanup(self.kfet_open.clear_cache)
|
|
|
|
def ws_connect(self, ws_client):
|
|
ws_client.send_and_consume(
|
|
"websocket.connect", path="/ws/k-fet/open", fail_on_none=True
|
|
)
|
|
return ws_client.receive(json=True)
|
|
|
|
def test_scenario_0(self):
|
|
"""Clients connect."""
|
|
# test for anonymous user
|
|
msg = self.ws_connect(self.c_ws)
|
|
self.assertSetEqual(set(["status"]), set(msg))
|
|
|
|
# test for root user
|
|
msg = self.ws_connect(self.r_c_ws)
|
|
self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(msg))
|
|
|
|
def test_scenario_1(self):
|
|
"""Clients connect, door opens, enable force close."""
|
|
self.ws_connect(self.c_ws)
|
|
self.ws_connect(self.r_c_ws)
|
|
|
|
# door sent "I'm open!"
|
|
self.c.post("/k-fet/open/raw_open", {"raw_open": True, "token": "plop"})
|
|
|
|
# anonymous user agree
|
|
msg = self.c_ws.receive(json=True)
|
|
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
|
|
|
# root user too
|
|
msg = self.r_c_ws.receive(json=True)
|
|
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
|
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
|
|
|
|
# admin says "no it's closed"
|
|
self.r_c.post("/k-fet/open/force_close", {"force_close": True})
|
|
|
|
# so anonymous user see it's closed
|
|
msg = self.c_ws.receive(json=True)
|
|
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
|
|
|
# root user too
|
|
msg = self.r_c_ws.receive(json=True)
|
|
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
|
# but root knows things
|
|
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
|
|
self.assertTrue(msg["force_close"])
|
|
|
|
def test_scenario_2(self):
|
|
"""Starting falsely closed, clients connect, disable force close."""
|
|
self.kfet_open.raw_open = True
|
|
self.kfet_open.force_close = True
|
|
|
|
msg = self.ws_connect(self.c_ws)
|
|
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
|
|
|
msg = self.ws_connect(self.r_c_ws)
|
|
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
|
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
|
|
self.assertTrue(msg["force_close"])
|
|
|
|
self.r_c.post("/k-fet/open/force_close", {"force_close": False})
|
|
|
|
msg = self.c_ws.receive(json=True)
|
|
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
|
|
|
msg = self.r_c_ws.receive(json=True)
|
|
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
|
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
|
|
self.assertFalse(msg["force_close"])
|