from datetime import timedelta from django.utils import timezone from ..decorators import kfet_is_team from ..utils import CachedMixin class OpenKfet(CachedMixin, object): """Manage "open" status of a place. Stores raw data (e.g. sent by raspberry), and user-set values (as force_close). Setting differents `cache_prefix` allows different places management. Current state persists through cache. """ # status is unknown after this duration time_unknown = timedelta(minutes=15) # status OPENED = "opened" CLOSED = "closed" UNKNOWN = "unknown" # admin status FAKE_CLOSED = "fake_closed" # cached attributes config cached = {"_raw_open": False, "_last_update": None, "force_close": False} cache_prefix = "kfetopen" @property def raw_open(self): """Defined as property to update `last_update` on `raw_open` update.""" return self._raw_open @raw_open.setter def raw_open(self, value): self._last_update = timezone.now() self._raw_open = value @property def last_update(self): """Prevent `last_update` to be set.""" return self._last_update @property def is_open(self): """Take into account force_close.""" return False if self.force_close else self.raw_open def status(self): if ( self.last_update is None or timezone.now() - self.last_update >= self.time_unknown ): return self.UNKNOWN return self.OPENED if self.is_open else self.CLOSED def admin_status(self, status=None): if status is None: status = self.status() if status == self.CLOSED and self.raw_open: return self.FAKE_CLOSED return status def _export(self): """Export internal state. Used by WS initialization and updates. Returns: (tuple): (base, team) - team for team users. - base for others. """ status = self.status() base = {"status": status} restrict = { "admin_status": self.admin_status(status), "force_close": self.force_close, } return base, dict(base, **restrict) def export(self, user): """Export internal state for a given user. Returns: (dict): Internal state. Only variables visible for the user are exported, according to its permissions. """ base, team = self._export() return team if kfet_is_team(user) else base def send_ws(self): """Send internal state to websocket channels.""" from .consumers import OpenKfetConsumer base, team = self._export() OpenKfetConsumer.group_send("kfet.open.base", base) OpenKfetConsumer.group_send("kfet.open.team", team) kfet_open = OpenKfet()