kpsul/kfet/open/open.py

108 lines
2.8 KiB
Python
Raw Permalink Normal View History

from datetime import timedelta
from django.utils import timezone
from ..decorators import kfet_is_team
from ..utils import CachedMixin
class OpenKfet(CachedMixin, object):
"""Manage "open" status of a place.
Stores raw data (e.g. sent by raspberry), and user-set values
(as force_close).
2017-06-21 23:31:27 +02:00
Setting differents `cache_prefix` allows different places management.
Current state persists through cache.
"""
# status is unknown after this duration
time_unknown = timedelta(minutes=15)
# status
OPENED = "opened"
CLOSED = "closed"
UNKNOWN = "unknown"
# admin status
FAKE_CLOSED = "fake_closed"
# cached attributes config
cached = {"_raw_open": False, "_last_update": None, "force_close": False}
cache_prefix = "kfetopen"
@property
def raw_open(self):
"""Defined as property to update `last_update` on `raw_open` update."""
return self._raw_open
@raw_open.setter
def raw_open(self, value):
self._last_update = timezone.now()
self._raw_open = value
@property
def last_update(self):
"""Prevent `last_update` to be set."""
return self._last_update
@property
def is_open(self):
"""Take into account force_close."""
return False if self.force_close else self.raw_open
def status(self):
if (
self.last_update is None
or timezone.now() - self.last_update >= self.time_unknown
):
return self.UNKNOWN
return self.OPENED if self.is_open else self.CLOSED
def admin_status(self, status=None):
if status is None:
status = self.status()
if status == self.CLOSED and self.raw_open:
return self.FAKE_CLOSED
return status
def _export(self):
"""Export internal state.
Used by WS initialization and updates.
Returns:
(tuple): (base, team)
- team for team users.
- base for others.
"""
status = self.status()
base = {"status": status}
restrict = {
"admin_status": self.admin_status(status),
"force_close": self.force_close,
}
2018-01-10 17:25:07 +01:00
return base, dict(base, **restrict)
def export(self, user):
"""Export internal state for a given user.
Returns:
(dict): Internal state. Only variables visible for the user are
exported, according to its permissions.
"""
base, team = self._export()
return team if kfet_is_team(user) else base
def send_ws(self):
"""Send internal state to websocket channels."""
from .consumers import OpenKfetConsumer
base, team = self._export()
OpenKfetConsumer.group_send("kfet.open.base", base)
OpenKfetConsumer.group_send("kfet.open.team", team)
kfet_open = OpenKfet()