319 lines
12 KiB
Python
319 lines
12 KiB
Python
import json
|
|
import threading
|
|
from datetime import timedelta
|
|
from unittest import mock
|
|
|
|
from channels.channel import Group
|
|
from channels.test import ChannelTestCase, WSClient
|
|
from django.contrib.auth.models import AnonymousUser, Permission, User
|
|
from django.test import Client
|
|
from django.utils import timezone
|
|
|
|
from . import OpenKfet
|
|
from .consumers import OpenKfetConsumer
|
|
|
|
|
|
class OpenKfetTest(ChannelTestCase):
|
|
"""OpenKfet object unit-tests suite."""
|
|
|
|
def setUp(self):
|
|
self.kfet_open = OpenKfet(
|
|
cache_prefix="test_kfetopen_%s" % threading.get_ident()
|
|
)
|
|
self.addCleanup(self.kfet_open.clear_cache)
|
|
|
|
def test_defaults(self):
|
|
"""Default values."""
|
|
self.assertFalse(self.kfet_open.raw_open)
|
|
self.assertIsNone(self.kfet_open.last_update)
|
|
self.assertFalse(self.kfet_open.force_close)
|
|
self.assertFalse(self.kfet_open.is_open)
|
|
|
|
def test_raw_open(self):
|
|
"""Get and set raw_open; last_update is renewed."""
|
|
for raw_open in [True, False]:
|
|
prev_update = self.kfet_open.last_update
|
|
self.kfet_open.raw_open = raw_open
|
|
self.assertEqual(raw_open, self.kfet_open.raw_open)
|
|
self.assertNotEqual(prev_update, self.kfet_open.last_update)
|
|
|
|
def test_force_close(self):
|
|
"""Get and set force_close."""
|
|
for force_close in [True, False]:
|
|
self.kfet_open.force_close = force_close
|
|
self.assertEqual(force_close, self.kfet_open.force_close)
|
|
|
|
def test_is_open(self):
|
|
"""If force_close is disabled, is_open is raw_open."""
|
|
self.kfet_open.force_close = False
|
|
for raw_open in [True, False]:
|
|
self.kfet_open.raw_open = raw_open
|
|
self.assertEqual(raw_open, self.kfet_open.is_open)
|
|
|
|
def test_is_open_force_close(self):
|
|
"""If force_close is enabled, is_open is False."""
|
|
self.kfet_open.force_close = True
|
|
for raw_open in [True, False]:
|
|
self.kfet_open.raw_open = raw_open
|
|
self.assertFalse(self.kfet_open.is_open)
|
|
|
|
def test_status(self):
|
|
# (raw_open, force_close, expected status, expected admin)
|
|
cases = [
|
|
(False, False, OpenKfet.CLOSED, OpenKfet.CLOSED),
|
|
(False, True, OpenKfet.CLOSED, OpenKfet.CLOSED),
|
|
(True, False, OpenKfet.OPENED, OpenKfet.OPENED),
|
|
(True, True, OpenKfet.CLOSED, OpenKfet.FAKE_CLOSED),
|
|
]
|
|
for raw_open, force_close, exp_stat, exp_adm_stat in cases:
|
|
self.kfet_open.raw_open = raw_open
|
|
self.kfet_open.force_close = force_close
|
|
self.assertEqual(exp_stat, self.kfet_open.status())
|
|
self.assertEqual(exp_adm_stat, self.kfet_open.admin_status())
|
|
|
|
def test_status_unknown(self):
|
|
self.kfet_open.raw_open = True
|
|
self.kfet_open._last_update = timezone.now() - timedelta(days=30)
|
|
self.assertEqual(OpenKfet.UNKNOWN, self.kfet_open.status())
|
|
|
|
def test_export_user(self):
|
|
"""Export is limited for an anonymous user."""
|
|
export = self.kfet_open.export(AnonymousUser())
|
|
self.assertSetEqual(set(["status"]), set(export))
|
|
|
|
def test_export_team(self):
|
|
"""Export all values for a team member."""
|
|
user = User.objects.create_user("team", "", "team")
|
|
user.user_permissions.add(Permission.objects.get(codename="is_team"))
|
|
export = self.kfet_open.export(user)
|
|
self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(export))
|
|
|
|
def test_send_ws(self):
|
|
Group("kfet.open.base").add("test.open.base")
|
|
Group("kfet.open.team").add("test.open.team")
|
|
|
|
self.kfet_open.send_ws()
|
|
|
|
recv_base = self.get_next_message("test.open.base", require=True)
|
|
base = json.loads(recv_base["text"])
|
|
self.assertSetEqual(set(["status"]), set(base))
|
|
|
|
recv_admin = self.get_next_message("test.open.team", require=True)
|
|
admin = json.loads(recv_admin["text"])
|
|
self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(admin))
|
|
|
|
|
|
class OpenKfetViewsTest(ChannelTestCase):
|
|
"""OpenKfet views unit-tests suite."""
|
|
|
|
def setUp(self):
|
|
# Need this (and here) because of '<client>.login' in setUp
|
|
patcher_messages = mock.patch("gestioncof.signals.messages")
|
|
patcher_messages.start()
|
|
self.addCleanup(patcher_messages.stop)
|
|
|
|
# get some permissions
|
|
perms = {
|
|
"kfet.is_team": Permission.objects.get(codename="is_team"),
|
|
"kfet.can_force_close": Permission.objects.get(codename="can_force_close"),
|
|
}
|
|
|
|
# authenticated user and its client
|
|
self.u = User.objects.create_user("user", "", "user")
|
|
self.c = Client()
|
|
self.c.login(username="user", password="user")
|
|
|
|
# team user and its clients
|
|
self.t = User.objects.create_user("team", "", "team")
|
|
self.t.user_permissions.add(perms["kfet.is_team"])
|
|
self.c_t = Client()
|
|
self.c_t.login(username="team", password="team")
|
|
|
|
# admin user and its client
|
|
self.a = User.objects.create_user("admin", "", "admin")
|
|
self.a.user_permissions.add(
|
|
perms["kfet.is_team"], perms["kfet.can_force_close"]
|
|
)
|
|
self.c_a = Client()
|
|
self.c_a.login(username="admin", password="admin")
|
|
|
|
self.kfet_open = OpenKfet(
|
|
cache_prefix="test_kfetopen_%s" % threading.get_ident()
|
|
)
|
|
self.addCleanup(self.kfet_open.clear_cache)
|
|
|
|
views_patcher = mock.patch("kfet.open.views.kfet_open", self.kfet_open)
|
|
views_patcher.start()
|
|
self.addCleanup(views_patcher.stop)
|
|
|
|
def test_door(self):
|
|
"""Edit raw_status."""
|
|
for sent, expected in [(1, True), (0, False)]:
|
|
resp = Client().post(
|
|
"/k-fet/open/raw_open", {"raw_open": sent, "token": "plop"}
|
|
)
|
|
self.assertEqual(200, resp.status_code)
|
|
self.assertEqual(expected, self.kfet_open.raw_open)
|
|
|
|
def test_force_close(self):
|
|
"""Edit force_close."""
|
|
for sent, expected in [(1, True), (0, False)]:
|
|
resp = self.c_a.post("/k-fet/open/force_close", {"force_close": sent})
|
|
self.assertEqual(200, resp.status_code)
|
|
self.assertEqual(expected, self.kfet_open.force_close)
|
|
|
|
def test_force_close_forbidden(self):
|
|
"""Can't edit force_close without kfet.can_force_close permission."""
|
|
clients = [Client(), self.c, self.c_t]
|
|
for client in clients:
|
|
resp = client.post("/k-fet/open/force_close", {"force_close": 0})
|
|
self.assertEqual(403, resp.status_code)
|
|
|
|
|
|
class OpenKfetConsumerTest(ChannelTestCase):
|
|
"""OpenKfet consumer unit-tests suite."""
|
|
|
|
def test_standard_user(self):
|
|
"""Lambda user is added to kfet.open.base group."""
|
|
# setup anonymous client
|
|
c = WSClient()
|
|
|
|
# connect
|
|
c.send_and_consume(
|
|
"websocket.connect", path="/ws/k-fet/open", fail_on_none=True
|
|
)
|
|
|
|
# initialization data is replied on connection
|
|
self.assertIsNotNone(c.receive())
|
|
|
|
# client belongs to the 'kfet.open' group...
|
|
OpenKfetConsumer.group_send("kfet.open.base", {"test": "plop"})
|
|
self.assertEqual(c.receive(), {"test": "plop"})
|
|
|
|
# ...but not to the 'kfet.open.admin' one
|
|
OpenKfetConsumer.group_send("kfet.open.team", {"test": "plop"})
|
|
self.assertIsNone(c.receive())
|
|
|
|
@mock.patch("gestioncof.signals.messages")
|
|
def test_team_user(self, mock_messages):
|
|
"""Team user is added to kfet.open.team group."""
|
|
# setup team user and its client
|
|
t = User.objects.create_user("team", "", "team")
|
|
t.user_permissions.add(Permission.objects.get(codename="is_team"))
|
|
c = WSClient()
|
|
c.force_login(t)
|
|
|
|
# connect
|
|
c.send_and_consume(
|
|
"websocket.connect", path="/ws/k-fet/open", fail_on_none=True
|
|
)
|
|
|
|
# initialization data is replied on connection
|
|
self.assertIsNotNone(c.receive())
|
|
|
|
# client belongs to the 'kfet.open.admin' group...
|
|
OpenKfetConsumer.group_send("kfet.open.team", {"test": "plop"})
|
|
self.assertEqual(c.receive(), {"test": "plop"})
|
|
|
|
# ... but not to the 'kfet.open' one
|
|
OpenKfetConsumer.group_send("kfet.open.base", {"test": "plop"})
|
|
self.assertIsNone(c.receive())
|
|
|
|
|
|
class OpenKfetScenarioTest(ChannelTestCase):
|
|
"""OpenKfet functionnal tests suite."""
|
|
|
|
def setUp(self):
|
|
# Need this (and here) because of '<client>.login' in setUp
|
|
patcher_messages = mock.patch("gestioncof.signals.messages")
|
|
patcher_messages.start()
|
|
self.addCleanup(patcher_messages.stop)
|
|
|
|
# anonymous client (for views)
|
|
self.c = Client()
|
|
# anonymous client (for websockets)
|
|
self.c_ws = WSClient()
|
|
|
|
# root user
|
|
self.r = User.objects.create_superuser("root", "", "root")
|
|
# its client (for views)
|
|
self.r_c = Client()
|
|
self.r_c.login(username="root", password="root")
|
|
# its client (for websockets)
|
|
self.r_c_ws = WSClient()
|
|
self.r_c_ws.force_login(self.r)
|
|
|
|
self.kfet_open = OpenKfet(
|
|
cache_prefix="test_kfetopen_%s" % threading.get_ident()
|
|
)
|
|
self.addCleanup(self.kfet_open.clear_cache)
|
|
|
|
def ws_connect(self, ws_client):
|
|
ws_client.send_and_consume(
|
|
"websocket.connect", path="/ws/k-fet/open", fail_on_none=True
|
|
)
|
|
return ws_client.receive(json=True)
|
|
|
|
def test_scenario_0(self):
|
|
"""Clients connect."""
|
|
# test for anonymous user
|
|
msg = self.ws_connect(self.c_ws)
|
|
self.assertSetEqual(set(["status"]), set(msg))
|
|
|
|
# test for root user
|
|
msg = self.ws_connect(self.r_c_ws)
|
|
self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(msg))
|
|
|
|
def test_scenario_1(self):
|
|
"""Clients connect, door opens, enable force close."""
|
|
self.ws_connect(self.c_ws)
|
|
self.ws_connect(self.r_c_ws)
|
|
|
|
# door sent "I'm open!"
|
|
self.c.post("/k-fet/open/raw_open", {"raw_open": True, "token": "plop"})
|
|
|
|
# anonymous user agree
|
|
msg = self.c_ws.receive(json=True)
|
|
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
|
|
|
# root user too
|
|
msg = self.r_c_ws.receive(json=True)
|
|
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
|
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
|
|
|
|
# admin says "no it's closed"
|
|
self.r_c.post("/k-fet/open/force_close", {"force_close": True})
|
|
|
|
# so anonymous user see it's closed
|
|
msg = self.c_ws.receive(json=True)
|
|
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
|
|
|
# root user too
|
|
msg = self.r_c_ws.receive(json=True)
|
|
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
|
# but root knows things
|
|
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
|
|
self.assertTrue(msg["force_close"])
|
|
|
|
def test_scenario_2(self):
|
|
"""Starting falsely closed, clients connect, disable force close."""
|
|
self.kfet_open.raw_open = True
|
|
self.kfet_open.force_close = True
|
|
|
|
msg = self.ws_connect(self.c_ws)
|
|
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
|
|
|
msg = self.ws_connect(self.r_c_ws)
|
|
self.assertEqual(OpenKfet.CLOSED, msg["status"])
|
|
self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
|
|
self.assertTrue(msg["force_close"])
|
|
|
|
self.r_c.post("/k-fet/open/force_close", {"force_close": False})
|
|
|
|
msg = self.c_ws.receive(json=True)
|
|
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
|
|
|
msg = self.r_c_ws.receive(json=True)
|
|
self.assertEqual(OpenKfet.OPENED, msg["status"])
|
|
self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
|
|
self.assertFalse(msg["force_close"])
|