from datetime import date, datetime, time, timedelta from dateutil.parser import parse as dateutil_parse from dateutil.relativedelta import relativedelta from django.utils import timezone KFET_WAKES_UP_AT = time(5, 0) # La K-Fêt ouvre à 5h (UTC) du matin def kfet_day(year, month, day, start_at=KFET_WAKES_UP_AT): """Étant donné une date, renvoie un objet `datetime` correspondant au début du 'jour K-Fêt' correspondant.""" return datetime.combine(date(year, month, day), start_at) def to_kfet_day(dt, start_at=KFET_WAKES_UP_AT): """ Retourne le 'jour K-Fêt' correspondant à un objet `datetime` donné """ kfet_dt = kfet_day(year=dt.year, month=dt.month, day=dt.day) if dt.time() < start_at: kfet_dt -= timedelta(days=1) return kfet_dt class Scale(object): """ Classe utilisée pour subdiviser un QuerySet (e.g. des opérations) sur une échelle de temps donnée, avec un pas de temps fixe. Cette échelle peut être spécifiée : - par un début et une fin, - par un début/une fin et un nombre de subdivisions. Si le booléen `std_chunk` est activé, le début de la première subdivision est généré via la fonction `get_chunk_start`. """ name = None step = None def __init__(self, n_steps=0, begin=None, end=None, last=False, std_chunk=True): self.std_chunk = std_chunk if last: end = timezone.now() if std_chunk: if begin is not None: begin = self.get_chunk_start(begin) if end is not None: end = self.do_step(self.get_chunk_start(end)) if begin is not None and n_steps != 0: self.begin = begin self.end = self.do_step(self.begin, n_steps=n_steps) elif end is not None and n_steps != 0: self.end = end self.begin = self.do_step(self.end, n_steps=-n_steps) elif begin is not None and end is not None: self.begin = begin self.end = end else: raise Exception( "Two of these args must be specified: " "n_steps, begin, end; " "or use last and n_steps" ) self._gen_datetimes() @staticmethod def by_name(name): for cls in Scale.__subclasses__(): if cls.name == name: return cls return None def __getitem__(self, i): return self.datetimes[i], self.datetimes[i + 1] def __len__(self): return len(self.datetimes) - 1 def do_step(self, dt, n_steps=1): return dt + self.step * n_steps def _gen_datetimes(self): datetimes = [self.begin] tmp = self.begin while tmp < self.end: tmp = self.do_step(tmp) datetimes.append(tmp) self.datetimes = datetimes def get_labels(self, label_fmt=None): if label_fmt is None: label_fmt = self.label_fmt return [ begin.strftime(label_fmt.format(i=i, rev_i=len(self) - i)) for i, (begin, end) in enumerate(self) ] def chunkify_qs(self, qs, field="at", aggregate=None): """ Découpe un queryset en subdivisions, avec agrégation optionnelle des résultats NB : on pourrait faire ça en une requête, au détriment de la lisibilité... """ begin_f = "{}__gte".format(field) end_f = "{}__lte".format(field) chunks = [qs.filter(**{begin_f: begin, end_f: end}) for begin, end in self] if aggregate is None: return chunks else: return [chunk.aggregate(agg=aggregate)["agg"] or 0 for chunk in chunks] class DayScale(Scale): name = "day" step = timedelta(days=1) label_fmt = "%A" @classmethod def get_chunk_start(cls, dt): return to_kfet_day(dt) class WeekScale(Scale): name = "week" step = timedelta(days=7) label_fmt = "%d %b." @classmethod def get_chunk_start(cls, dt): dt_kfet = to_kfet_day(dt) offset = timedelta(days=dt_kfet.weekday()) return dt_kfet - offset class MonthScale(Scale): name = "month" step = relativedelta(months=1) label_fmt = "%B" @classmethod def get_chunk_start(cls, dt): return to_kfet_day(dt).replace(day=1) def scale_url_params(scales_def, **other_url_params): """ Convertit une spécification de scales en arguments GET utilisables par ScaleMixin. La spécification est de la forme suivante : - scales_def : liste de champs de la forme (label, scale) - scale_args : arguments à passer à Scale.__init__ - other_url_params : paramètres GET supplémentaires """ params_list = [] for label, cls, params, default in scales_def: url_params = {"scale_name": cls.name} url_params.update({"scale_" + key: value for key, value in params.items()}) url_params.update(other_url_params) params_list.append(dict(label=label, url_params=url_params, default=default)) return params_list class ScaleMixin(object): def parse_scale_args(self): """ Récupère les paramètres de subdivision encodés dans une requête GET. """ scale_args = {} name = self.request.GET.get("scale_name", None) if name is not None: scale_args["name"] = name n_steps = self.request.GET.get("scale_n_steps", None) if n_steps is not None: scale_args["n_steps"] = int(n_steps) begin = self.request.GET.get("scale_begin", None) if begin is not None: scale_args["begin"] = dateutil_parse(begin) end = self.request.GET.get("scale_send", None) if end is not None: scale_args["end"] = dateutil_parse(end) last = self.request.GET.get("scale_last", None) if last is not None: scale_args["last"] = last in ["true", "True", "1"] and True or False return scale_args def get_context_data(self, *args, **kwargs): # On n'hérite pas scale_args = self.parse_scale_args() scale_name = scale_args.pop("name", None) scale_cls = Scale.by_name(scale_name) if scale_cls is None: self.scale = self.get_default_scale() else: self.scale = scale_cls(**scale_args) return {"labels": self.scale.get_labels()} def get_default_scale(self): return DayScale(n_steps=7, last=True)