import json
import random
from datetime import timedelta
from unittest import mock

from channels.channel import Group
from channels.test import ChannelTestCase, WSClient
from django.contrib.auth.models import AnonymousUser, Permission, User
from django.test import Client
from django.utils import timezone

from . import OpenKfet
from .consumers import OpenKfetConsumer


class OpenKfetTest(ChannelTestCase):
    """OpenKfet object unit-tests suite."""

    def setUp(self):
        self.kfet_open = OpenKfet(
            cache_prefix="test_kfetopen_%s" % random.randrange(2 ** 20)
        )
        self.addCleanup(self.kfet_open.clear_cache)

    def test_defaults(self):
        """Default values."""
        self.assertFalse(self.kfet_open.raw_open)
        self.assertIsNone(self.kfet_open.last_update)
        self.assertFalse(self.kfet_open.force_close)
        self.assertFalse(self.kfet_open.is_open)

    def test_raw_open(self):
        """Get and set raw_open; last_update is renewed."""
        for raw_open in [True, False]:
            prev_update = self.kfet_open.last_update
            self.kfet_open.raw_open = raw_open
            self.assertEqual(raw_open, self.kfet_open.raw_open)
            self.assertNotEqual(prev_update, self.kfet_open.last_update)

    def test_force_close(self):
        """Get and set force_close."""
        for force_close in [True, False]:
            self.kfet_open.force_close = force_close
            self.assertEqual(force_close, self.kfet_open.force_close)

    def test_is_open(self):
        """If force_close is disabled, is_open is raw_open."""
        self.kfet_open.force_close = False
        for raw_open in [True, False]:
            self.kfet_open.raw_open = raw_open
            self.assertEqual(raw_open, self.kfet_open.is_open)

    def test_is_open_force_close(self):
        """If force_close is enabled, is_open is False."""
        self.kfet_open.force_close = True
        for raw_open in [True, False]:
            self.kfet_open.raw_open = raw_open
            self.assertFalse(self.kfet_open.is_open)

    def test_status(self):
        # (raw_open, force_close, expected status, expected admin)
        cases = [
            (False, False, OpenKfet.CLOSED, OpenKfet.CLOSED),
            (False, True, OpenKfet.CLOSED, OpenKfet.CLOSED),
            (True, False, OpenKfet.OPENED, OpenKfet.OPENED),
            (True, True, OpenKfet.CLOSED, OpenKfet.FAKE_CLOSED),
        ]
        for raw_open, force_close, exp_stat, exp_adm_stat in cases:
            self.kfet_open.raw_open = raw_open
            self.kfet_open.force_close = force_close
            self.assertEqual(exp_stat, self.kfet_open.status())
            self.assertEqual(exp_adm_stat, self.kfet_open.admin_status())

    def test_status_unknown(self):
        self.kfet_open.raw_open = True
        self.kfet_open._last_update = timezone.now() - timedelta(days=30)
        self.assertEqual(OpenKfet.UNKNOWN, self.kfet_open.status())

    def test_export_user(self):
        """Export is limited for an anonymous user."""
        export = self.kfet_open.export(AnonymousUser())
        self.assertSetEqual(set(["status"]), set(export))

    def test_export_team(self):
        """Export all values for a team member."""
        user = User.objects.create_user("team", "", "team")
        user.user_permissions.add(Permission.objects.get(codename="is_team"))
        export = self.kfet_open.export(user)
        self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(export))

    def test_send_ws(self):
        Group("kfet.open.base").add("test.open.base")
        Group("kfet.open.team").add("test.open.team")

        self.kfet_open.send_ws()

        recv_base = self.get_next_message("test.open.base", require=True)
        base = json.loads(recv_base["text"])
        self.assertSetEqual(set(["status"]), set(base))

        recv_admin = self.get_next_message("test.open.team", require=True)
        admin = json.loads(recv_admin["text"])
        self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(admin))


class OpenKfetViewsTest(ChannelTestCase):
    """OpenKfet views unit-tests suite."""

    def setUp(self):
        # Need this (and here) because of '<client>.login' in setUp
        patcher_messages = mock.patch("gestioncof.signals.messages")
        patcher_messages.start()
        self.addCleanup(patcher_messages.stop)

        # get some permissions
        perms = {
            "kfet.is_team": Permission.objects.get(codename="is_team"),
            "kfet.can_force_close": Permission.objects.get(codename="can_force_close"),
        }

        # authenticated user and its client
        self.u = User.objects.create_user("user", "", "user")
        self.c = Client()
        self.c.login(username="user", password="user")

        # team user and its clients
        self.t = User.objects.create_user("team", "", "team")
        self.t.user_permissions.add(perms["kfet.is_team"])
        self.c_t = Client()
        self.c_t.login(username="team", password="team")

        # admin user and its client
        self.a = User.objects.create_user("admin", "", "admin")
        self.a.user_permissions.add(
            perms["kfet.is_team"], perms["kfet.can_force_close"]
        )
        self.c_a = Client()
        self.c_a.login(username="admin", password="admin")

        self.kfet_open = OpenKfet(
            cache_prefix="test_kfetopen_%s" % random.randrange(2 ** 20)
        )
        self.addCleanup(self.kfet_open.clear_cache)

        views_patcher = mock.patch("kfet.open.views.kfet_open", self.kfet_open)
        views_patcher.start()
        self.addCleanup(views_patcher.stop)

    def test_door(self):
        """Edit raw_status."""
        for sent, expected in [(1, True), (0, False)]:
            resp = Client().post(
                "/k-fet/open/raw_open", {"raw_open": sent, "token": "plop"}
            )
            self.assertEqual(200, resp.status_code)
            self.assertEqual(expected, self.kfet_open.raw_open)

    def test_force_close(self):
        """Edit force_close."""
        for sent, expected in [(1, True), (0, False)]:
            resp = self.c_a.post("/k-fet/open/force_close", {"force_close": sent})
            self.assertEqual(200, resp.status_code)
            self.assertEqual(expected, self.kfet_open.force_close)

    def test_force_close_forbidden(self):
        """Can't edit force_close without kfet.can_force_close permission."""
        clients = [Client(), self.c, self.c_t]
        for client in clients:
            resp = client.post("/k-fet/open/force_close", {"force_close": 0})
            self.assertEqual(403, resp.status_code)


class OpenKfetConsumerTest(ChannelTestCase):
    """OpenKfet consumer unit-tests suite."""

    def test_standard_user(self):
        """Lambda user is added to kfet.open.base group."""
        # setup anonymous client
        c = WSClient()

        # connect
        c.send_and_consume(
            "websocket.connect", path="/ws/k-fet/open", fail_on_none=True
        )

        # initialization data is replied on connection
        self.assertIsNotNone(c.receive())

        # client belongs to the 'kfet.open' group...
        OpenKfetConsumer.group_send("kfet.open.base", {"test": "plop"})
        self.assertEqual(c.receive(), {"test": "plop"})

        # ...but not to the 'kfet.open.admin' one
        OpenKfetConsumer.group_send("kfet.open.team", {"test": "plop"})
        self.assertIsNone(c.receive())

    @mock.patch("gestioncof.signals.messages")
    def test_team_user(self, mock_messages):
        """Team user is added to kfet.open.team group."""
        # setup team user and its client
        t = User.objects.create_user("team", "", "team")
        t.user_permissions.add(Permission.objects.get(codename="is_team"))
        c = WSClient()
        c.force_login(t)

        # connect
        c.send_and_consume(
            "websocket.connect", path="/ws/k-fet/open", fail_on_none=True
        )

        # initialization data is replied on connection
        self.assertIsNotNone(c.receive())

        # client belongs to the 'kfet.open.admin' group...
        OpenKfetConsumer.group_send("kfet.open.team", {"test": "plop"})
        self.assertEqual(c.receive(), {"test": "plop"})

        # ... but not to the 'kfet.open' one
        OpenKfetConsumer.group_send("kfet.open.base", {"test": "plop"})
        self.assertIsNone(c.receive())


class OpenKfetScenarioTest(ChannelTestCase):
    """OpenKfet functionnal tests suite."""

    def setUp(self):
        # Need this (and here) because of '<client>.login' in setUp
        patcher_messages = mock.patch("gestioncof.signals.messages")
        patcher_messages.start()
        self.addCleanup(patcher_messages.stop)

        # anonymous client (for views)
        self.c = Client()
        # anonymous client (for websockets)
        self.c_ws = WSClient()

        # root user
        self.r = User.objects.create_superuser("root", "", "root")
        # its client (for views)
        self.r_c = Client()
        self.r_c.login(username="root", password="root")
        # its client (for websockets)
        self.r_c_ws = WSClient()
        self.r_c_ws.force_login(self.r)

        self.kfet_open = OpenKfet(
            cache_prefix="test_kfetopen_%s" % random.randrange(2 ** 20)
        )
        self.addCleanup(self.kfet_open.clear_cache)

    def ws_connect(self, ws_client):
        ws_client.send_and_consume(
            "websocket.connect", path="/ws/k-fet/open", fail_on_none=True
        )
        return ws_client.receive(json=True)

    def test_scenario_0(self):
        """Clients connect."""
        # test for anonymous user
        msg = self.ws_connect(self.c_ws)
        self.assertSetEqual(set(["status"]), set(msg))

        # test for root user
        msg = self.ws_connect(self.r_c_ws)
        self.assertSetEqual(set(["status", "admin_status", "force_close"]), set(msg))

    def test_scenario_1(self):
        """Clients connect, door opens, enable force close."""
        self.ws_connect(self.c_ws)
        self.ws_connect(self.r_c_ws)

        # door sent "I'm open!"
        self.c.post("/k-fet/open/raw_open", {"raw_open": True, "token": "plop"})

        # anonymous user agree
        msg = self.c_ws.receive(json=True)
        self.assertEqual(OpenKfet.OPENED, msg["status"])

        # root user too
        msg = self.r_c_ws.receive(json=True)
        self.assertEqual(OpenKfet.OPENED, msg["status"])
        self.assertEqual(OpenKfet.OPENED, msg["admin_status"])

        # admin says "no it's closed"
        self.r_c.post("/k-fet/open/force_close", {"force_close": True})

        # so anonymous user see it's closed
        msg = self.c_ws.receive(json=True)
        self.assertEqual(OpenKfet.CLOSED, msg["status"])

        # root user too
        msg = self.r_c_ws.receive(json=True)
        self.assertEqual(OpenKfet.CLOSED, msg["status"])
        # but root knows things
        self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
        self.assertTrue(msg["force_close"])

    def test_scenario_2(self):
        """Starting falsely closed, clients connect, disable force close."""
        self.kfet_open.raw_open = True
        self.kfet_open.force_close = True

        msg = self.ws_connect(self.c_ws)
        self.assertEqual(OpenKfet.CLOSED, msg["status"])

        msg = self.ws_connect(self.r_c_ws)
        self.assertEqual(OpenKfet.CLOSED, msg["status"])
        self.assertEqual(OpenKfet.FAKE_CLOSED, msg["admin_status"])
        self.assertTrue(msg["force_close"])

        self.r_c.post("/k-fet/open/force_close", {"force_close": False})

        msg = self.c_ws.receive(json=True)
        self.assertEqual(OpenKfet.OPENED, msg["status"])

        msg = self.r_c_ws.receive(json=True)
        self.assertEqual(OpenKfet.OPENED, msg["status"])
        self.assertEqual(OpenKfet.OPENED, msg["admin_status"])
        self.assertFalse(msg["force_close"])