kpsul/kfet/statistic.py
2020-05-08 11:14:32 +02:00

211 lines
6.4 KiB
Python

from datetime import date, datetime, time, timedelta
from dateutil.parser import parse as dateutil_parse
from dateutil.relativedelta import relativedelta
from django.utils import timezone
KFET_WAKES_UP_AT = time(5, 0) # La K-Fêt ouvre à 5h (UTC) du matin
def kfet_day(year, month, day, start_at=KFET_WAKES_UP_AT):
"""Étant donné une date, renvoie un objet `datetime`
correspondant au début du 'jour K-Fêt' correspondant."""
return datetime.combine(date(year, month, day), start_at)
def to_kfet_day(dt, start_at=KFET_WAKES_UP_AT):
"""
Retourne le 'jour K-Fêt' correspondant à un objet `datetime` donné
"""
kfet_dt = kfet_day(year=dt.year, month=dt.month, day=dt.day)
if dt.time() < start_at:
kfet_dt -= timedelta(days=1)
return kfet_dt
class Scale(object):
"""
Classe utilisée pour subdiviser un QuerySet (e.g. des opérations) sur
une échelle de temps donnée, avec un pas de temps fixe.
Cette échelle peut être spécifiée :
- par un début et une fin,
- par un début/une fin et un nombre de subdivisions.
Si le booléen `std_chunk` est activé, le début de la première subdivision
est généré via la fonction `get_chunk_start`.
"""
name = None
step = None
def __init__(self, n_steps=0, begin=None, end=None, last=False, std_chunk=True):
self.std_chunk = std_chunk
if last:
end = timezone.now()
if std_chunk:
if begin is not None:
begin = self.get_chunk_start(begin)
if end is not None:
end = self.do_step(self.get_chunk_start(end))
if begin is not None and n_steps != 0:
self.begin = begin
self.end = self.do_step(self.begin, n_steps=n_steps)
elif end is not None and n_steps != 0:
self.end = end
self.begin = self.do_step(self.end, n_steps=-n_steps)
elif begin is not None and end is not None:
self.begin = begin
self.end = end
else:
raise Exception(
"Two of these args must be specified: "
"n_steps, begin, end; "
"or use last and n_steps"
)
self._gen_datetimes()
@staticmethod
def by_name(name):
for cls in Scale.__subclasses__():
if cls.name == name:
return cls
return None
def __getitem__(self, i):
return self.datetimes[i], self.datetimes[i + 1]
def __len__(self):
return len(self.datetimes) - 1
def do_step(self, dt, n_steps=1):
return dt + self.step * n_steps
def _gen_datetimes(self):
datetimes = [self.begin]
tmp = self.begin
while tmp < self.end:
tmp = self.do_step(tmp)
datetimes.append(tmp)
self.datetimes = datetimes
def get_labels(self, label_fmt=None):
if label_fmt is None:
label_fmt = self.label_fmt
return [
begin.strftime(label_fmt.format(i=i, rev_i=len(self) - i))
for i, (begin, end) in enumerate(self)
]
def chunkify_qs(self, qs, field="at", aggregate=None):
"""
Découpe un queryset en subdivisions, avec agrégation optionnelle des résultats
NB : on pourrait faire ça en une requête, au détriment de la lisibilité...
"""
begin_f = "{}__gte".format(field)
end_f = "{}__lte".format(field)
chunks = [qs.filter(**{begin_f: begin, end_f: end}) for begin, end in self]
if aggregate is None:
return chunks
else:
return [chunk.aggregate(agg=aggregate)["agg"] or 0 for chunk in chunks]
class DayScale(Scale):
name = "day"
step = timedelta(days=1)
label_fmt = "%A"
@classmethod
def get_chunk_start(cls, dt):
return to_kfet_day(dt)
class WeekScale(Scale):
name = "week"
step = timedelta(days=7)
label_fmt = "%d %b."
@classmethod
def get_chunk_start(cls, dt):
dt_kfet = to_kfet_day(dt)
offset = timedelta(days=dt_kfet.weekday())
return dt_kfet - offset
class MonthScale(Scale):
name = "month"
step = relativedelta(months=1)
label_fmt = "%B"
@classmethod
def get_chunk_start(cls, dt):
return to_kfet_day(dt).replace(day=1)
def scale_url_params(scales_def, **other_url_params):
"""
Convertit une spécification de scales en arguments GET utilisables par ScaleMixin.
La spécification est de la forme suivante :
- scales_def : liste de champs de la forme (label, scale)
- scale_args : arguments à passer à Scale.__init__
- other_url_params : paramètres GET supplémentaires
"""
params_list = []
for label, cls, params, default in scales_def:
url_params = {"scale_name": cls.name}
url_params.update({"scale_" + key: value for key, value in params.items()})
url_params.update(other_url_params)
params_list.append(dict(label=label, url_params=url_params, default=default))
return params_list
class ScaleMixin(object):
def parse_scale_args(self):
"""
Récupère les paramètres de subdivision encodés dans une requête GET.
"""
scale_args = {}
name = self.request.GET.get("scale_name", None)
if name is not None:
scale_args["name"] = name
n_steps = self.request.GET.get("scale_n_steps", None)
if n_steps is not None:
scale_args["n_steps"] = int(n_steps)
begin = self.request.GET.get("scale_begin", None)
if begin is not None:
scale_args["begin"] = dateutil_parse(begin)
end = self.request.GET.get("scale_send", None)
if end is not None:
scale_args["end"] = dateutil_parse(end)
last = self.request.GET.get("scale_last", None)
if last is not None:
scale_args["last"] = last in ["true", "True", "1"] and True or False
return scale_args
def get_context_data(self, *args, **kwargs):
# On n'hérite pas
scale_args = self.parse_scale_args()
scale_name = scale_args.pop("name", None)
scale_cls = Scale.by_name(scale_name)
if scale_cls is None:
self.scale = self.get_default_scale()
else:
self.scale = scale_cls(**scale_args)
return {"labels": self.scale.get_labels()}
def get_default_scale(self):
return DayScale(n_steps=7, last=True)