kpsul/kfet/open/open.py
Aurélien Delobelle f58f120e7a py34 compat
(already present in prod)
2018-01-10 17:25:07 +01:00

110 lines
2.9 KiB
Python

from datetime import timedelta
from django.utils import timezone
from ..decorators import kfet_is_team
from ..utils import CachedMixin
class OpenKfet(CachedMixin, object):
"""Manage "open" status of a place.
Stores raw data (e.g. sent by raspberry), and user-set values
(as force_close).
Setting differents `cache_prefix` allows different places management.
Current state persists through cache.
"""
# status is unknown after this duration
time_unknown = timedelta(minutes=15)
# status
OPENED = 'opened'
CLOSED = 'closed'
UNKNOWN = 'unknown'
# admin status
FAKE_CLOSED = 'fake_closed'
# cached attributes config
cached = {
'_raw_open': False,
'_last_update': None,
'force_close': False,
}
cache_prefix = 'kfetopen'
@property
def raw_open(self):
"""Defined as property to update `last_update` on `raw_open` update."""
return self._raw_open
@raw_open.setter
def raw_open(self, value):
self._last_update = timezone.now()
self._raw_open = value
@property
def last_update(self):
"""Prevent `last_update` to be set."""
return self._last_update
@property
def is_open(self):
"""Take into account force_close."""
return False if self.force_close else self.raw_open
def status(self):
if (self.last_update is None or
timezone.now() - self.last_update >= self.time_unknown):
return self.UNKNOWN
return self.OPENED if self.is_open else self.CLOSED
def admin_status(self, status=None):
if status is None:
status = self.status()
if status == self.CLOSED and self.raw_open:
return self.FAKE_CLOSED
return status
def _export(self):
"""Export internal state.
Used by WS initialization and updates.
Returns:
(tuple): (base, team)
- team for team users.
- base for others.
"""
status = self.status()
base = {
'status': status,
}
restrict = {
'admin_status': self.admin_status(status),
'force_close': self.force_close,
}
return base, dict(base, **restrict)
def export(self, user):
"""Export internal state for a given user.
Returns:
(dict): Internal state. Only variables visible for the user are
exported, according to its permissions.
"""
base, team = self._export()
return team if kfet_is_team(user) else base
def send_ws(self):
"""Send internal state to websocket channels."""
from .consumers import OpenKfetConsumer
base, team = self._export()
OpenKfetConsumer.group_send('kfet.open.base', base)
OpenKfetConsumer.group_send('kfet.open.team', team)
kfet_open = OpenKfet()