forked from DGNum/gestioCOF
f58f120e7a
(already present in prod)
109 lines
2.9 KiB
Python
109 lines
2.9 KiB
Python
from datetime import timedelta
|
|
|
|
from django.utils import timezone
|
|
|
|
from ..decorators import kfet_is_team
|
|
from ..utils import CachedMixin
|
|
|
|
|
|
class OpenKfet(CachedMixin, object):
|
|
"""Manage "open" status of a place.
|
|
|
|
Stores raw data (e.g. sent by raspberry), and user-set values
|
|
(as force_close).
|
|
Setting differents `cache_prefix` allows different places management.
|
|
Current state persists through cache.
|
|
|
|
"""
|
|
# status is unknown after this duration
|
|
time_unknown = timedelta(minutes=15)
|
|
|
|
# status
|
|
OPENED = 'opened'
|
|
CLOSED = 'closed'
|
|
UNKNOWN = 'unknown'
|
|
# admin status
|
|
FAKE_CLOSED = 'fake_closed'
|
|
|
|
# cached attributes config
|
|
cached = {
|
|
'_raw_open': False,
|
|
'_last_update': None,
|
|
'force_close': False,
|
|
}
|
|
cache_prefix = 'kfetopen'
|
|
|
|
@property
|
|
def raw_open(self):
|
|
"""Defined as property to update `last_update` on `raw_open` update."""
|
|
return self._raw_open
|
|
|
|
@raw_open.setter
|
|
def raw_open(self, value):
|
|
self._last_update = timezone.now()
|
|
self._raw_open = value
|
|
|
|
@property
|
|
def last_update(self):
|
|
"""Prevent `last_update` to be set."""
|
|
return self._last_update
|
|
|
|
@property
|
|
def is_open(self):
|
|
"""Take into account force_close."""
|
|
return False if self.force_close else self.raw_open
|
|
|
|
def status(self):
|
|
if (self.last_update is None or
|
|
timezone.now() - self.last_update >= self.time_unknown):
|
|
return self.UNKNOWN
|
|
return self.OPENED if self.is_open else self.CLOSED
|
|
|
|
def admin_status(self, status=None):
|
|
if status is None:
|
|
status = self.status()
|
|
if status == self.CLOSED and self.raw_open:
|
|
return self.FAKE_CLOSED
|
|
return status
|
|
|
|
def _export(self):
|
|
"""Export internal state.
|
|
|
|
Used by WS initialization and updates.
|
|
|
|
Returns:
|
|
(tuple): (base, team)
|
|
- team for team users.
|
|
- base for others.
|
|
|
|
"""
|
|
status = self.status()
|
|
base = {
|
|
'status': status,
|
|
}
|
|
restrict = {
|
|
'admin_status': self.admin_status(status),
|
|
'force_close': self.force_close,
|
|
}
|
|
return base, dict(base, **restrict)
|
|
|
|
def export(self, user):
|
|
"""Export internal state for a given user.
|
|
|
|
Returns:
|
|
(dict): Internal state. Only variables visible for the user are
|
|
exported, according to its permissions.
|
|
|
|
"""
|
|
base, team = self._export()
|
|
return team if kfet_is_team(user) else base
|
|
|
|
def send_ws(self):
|
|
"""Send internal state to websocket channels."""
|
|
from .consumers import OpenKfetConsumer
|
|
base, team = self._export()
|
|
OpenKfetConsumer.group_send('kfet.open.base', base)
|
|
OpenKfetConsumer.group_send('kfet.open.team', team)
|
|
|
|
|
|
kfet_open = OpenKfet()
|