forked from DGNum/gestioCOF
320 lines
9.7 KiB
Python
320 lines
9.7 KiB
Python
from datetime import date, datetime, time, timedelta
|
|
|
|
import pytz
|
|
from dateutil.parser import parse as dateutil_parse
|
|
from dateutil.relativedelta import relativedelta
|
|
from django.db.models import Sum
|
|
from django.utils import timezone
|
|
|
|
KFET_WAKES_UP_AT = time(7, 0)
|
|
|
|
|
|
def kfet_day(year, month, day, start_at=KFET_WAKES_UP_AT):
|
|
"""datetime wrapper with time offset."""
|
|
naive = datetime.combine(date(year, month, day), start_at)
|
|
return pytz.timezone("Europe/Paris").localize(naive, is_dst=None)
|
|
|
|
|
|
def to_kfet_day(dt, start_at=KFET_WAKES_UP_AT):
|
|
kfet_dt = kfet_day(year=dt.year, month=dt.month, day=dt.day)
|
|
if dt.time() < start_at:
|
|
kfet_dt -= timedelta(days=1)
|
|
return kfet_dt
|
|
|
|
|
|
class Scale(object):
|
|
name = None
|
|
step = None
|
|
|
|
def __init__(self, n_steps=0, begin=None, end=None, last=False, std_chunk=True):
|
|
self.std_chunk = std_chunk
|
|
if last:
|
|
end = timezone.now()
|
|
if std_chunk:
|
|
if begin is not None:
|
|
begin = self.get_chunk_start(begin)
|
|
if end is not None:
|
|
end = self.do_step(self.get_chunk_start(end))
|
|
|
|
if begin is not None and n_steps != 0:
|
|
self.begin = begin
|
|
self.end = self.do_step(self.begin, n_steps=n_steps)
|
|
elif end is not None and n_steps != 0:
|
|
self.end = end
|
|
self.begin = self.do_step(self.end, n_steps=-n_steps)
|
|
elif begin is not None and end is not None:
|
|
self.begin = begin
|
|
self.end = end
|
|
else:
|
|
raise Exception(
|
|
"Two of these args must be specified: "
|
|
"n_steps, begin, end; "
|
|
"or use last and n_steps"
|
|
)
|
|
|
|
self.datetimes = self.get_datetimes()
|
|
|
|
@staticmethod
|
|
def by_name(name):
|
|
for cls in Scale.__subclasses__():
|
|
if cls.name == name:
|
|
return cls
|
|
return None
|
|
|
|
def get_from(self, dt):
|
|
return self.std_chunk and self.get_chunk_start(dt) or dt
|
|
|
|
def __getitem__(self, i):
|
|
return self.datetimes[i], self.datetimes[i + 1]
|
|
|
|
def __len__(self):
|
|
return len(self.datetimes) - 1
|
|
|
|
def do_step(self, dt, n_steps=1):
|
|
return dt + self.step * n_steps
|
|
|
|
def get_datetimes(self):
|
|
datetimes = [self.begin]
|
|
tmp = self.begin
|
|
while tmp < self.end:
|
|
tmp = self.do_step(tmp)
|
|
datetimes.append(tmp)
|
|
return datetimes
|
|
|
|
def get_labels(self, label_fmt=None):
|
|
if label_fmt is None:
|
|
label_fmt = self.label_fmt
|
|
return [
|
|
begin.strftime(label_fmt.format(i=i, rev_i=len(self) - i))
|
|
for i, (begin, end) in enumerate(self)
|
|
]
|
|
|
|
def chunkify_qs(self, qs, field=None):
|
|
if field is None:
|
|
field = "at"
|
|
begin_f = "{}__gte".format(field)
|
|
end_f = "{}__lte".format(field)
|
|
return [qs.filter(**{begin_f: begin, end_f: end}) for begin, end in self]
|
|
|
|
def get_by_chunks(self, qs, field_callback=None, field_db="at"):
|
|
"""Objects of queryset ranked according to the scale.
|
|
|
|
Returns a generator whose each item, corresponding to a scale chunk,
|
|
is a generator of objects from qs for this chunk.
|
|
|
|
Args:
|
|
qs: Queryset of source objects, must be ordered *first* on the
|
|
same field returned by `field_callback`.
|
|
field_callback: Callable which gives value from an object used
|
|
to compare against limits of the scale chunks.
|
|
Default to: lambda obj: getattr(obj, field_db)
|
|
field_db: Used to filter against `scale` limits.
|
|
Default to 'at'.
|
|
|
|
Examples:
|
|
If queryset `qs` use `values()`, `field_callback` must be set and
|
|
could be: `lambda d: d['at']`
|
|
If `field_db` use foreign attributes (eg with `__`), it should be
|
|
something like: `lambda obj: obj.group.at`.
|
|
|
|
"""
|
|
if field_callback is None:
|
|
|
|
def field_callback(obj):
|
|
return getattr(obj, field_db)
|
|
|
|
begin_f = "{}__gte".format(field_db)
|
|
end_f = "{}__lte".format(field_db)
|
|
|
|
qs = qs.filter(**{begin_f: self.begin, end_f: self.end})
|
|
|
|
obj_iter = iter(qs)
|
|
|
|
last_obj = None
|
|
|
|
def _objects_until(obj_iter, field_callback, end):
|
|
"""Generator of objects until `end`.
|
|
|
|
Ends if objects source is empty or when an object not verifying
|
|
field_callback(obj) <= end is met.
|
|
|
|
If this object exists, it is stored in `last_obj` which is found
|
|
from outer scope.
|
|
Also, if this same variable is non-empty when the function is
|
|
called, it first yields its content.
|
|
|
|
Args:
|
|
obj_iter: Source used to get objects.
|
|
field_callback: Returned value, when it is called on an object
|
|
will be used to test ordering against `end`.
|
|
end
|
|
|
|
"""
|
|
nonlocal last_obj
|
|
|
|
if last_obj is not None:
|
|
yield last_obj
|
|
last_obj = None
|
|
|
|
for obj in obj_iter:
|
|
if field_callback(obj) <= end:
|
|
yield obj
|
|
else:
|
|
last_obj = obj
|
|
return
|
|
|
|
for begin, end in self:
|
|
# forward last seen object, if it exists, to the right chunk,
|
|
# and fill with empty generators for intermediate chunks of scale
|
|
if last_obj is not None:
|
|
if field_callback(last_obj) > end:
|
|
yield iter(())
|
|
continue
|
|
|
|
# yields generator for this chunk
|
|
# this set last_obj to None if obj_iter reach its end, otherwise
|
|
# it's set to the first met object from obj_iter which doesn't
|
|
# belong to this chunk
|
|
yield _objects_until(obj_iter, field_callback, end)
|
|
|
|
|
|
class DayScale(Scale):
|
|
name = "day"
|
|
step = timedelta(days=1)
|
|
label_fmt = "%A"
|
|
|
|
@classmethod
|
|
def get_chunk_start(cls, dt):
|
|
return to_kfet_day(dt)
|
|
|
|
|
|
class WeekScale(Scale):
|
|
name = "week"
|
|
step = timedelta(days=7)
|
|
label_fmt = "Semaine %W"
|
|
|
|
@classmethod
|
|
def get_chunk_start(cls, dt):
|
|
dt_kfet = to_kfet_day(dt)
|
|
offset = timedelta(days=dt_kfet.weekday())
|
|
return dt_kfet - offset
|
|
|
|
|
|
class MonthScale(Scale):
|
|
name = "month"
|
|
step = relativedelta(months=1)
|
|
label_fmt = "%B"
|
|
|
|
@classmethod
|
|
def get_chunk_start(cls, dt):
|
|
return to_kfet_day(dt).replace(day=1)
|
|
|
|
|
|
def stat_manifest(
|
|
scales_def=None, scale_args=None, scale_prefix=None, **other_url_params
|
|
):
|
|
if scale_prefix is None:
|
|
scale_prefix = "scale_"
|
|
if scales_def is None:
|
|
scales_def = []
|
|
if scale_args is None:
|
|
scale_args = {}
|
|
manifest = []
|
|
for label, cls in scales_def:
|
|
url_params = {scale_prefix + "name": cls.name}
|
|
url_params.update(
|
|
{scale_prefix + key: value for key, value in scale_args.items()}
|
|
)
|
|
url_params.update(other_url_params)
|
|
manifest.append(dict(label=label, url_params=url_params))
|
|
return manifest
|
|
|
|
|
|
def last_stats_manifest(
|
|
scales_def=None, scale_args=None, scale_prefix=None, **url_params
|
|
):
|
|
scales_def = [
|
|
("Derniers mois", MonthScale),
|
|
("Dernières semaines", WeekScale),
|
|
("Derniers jours", DayScale),
|
|
]
|
|
if scale_args is None:
|
|
scale_args = {}
|
|
scale_args.update(dict(last=True, n_steps=7))
|
|
return stat_manifest(
|
|
scales_def=scales_def,
|
|
scale_args=scale_args,
|
|
scale_prefix=scale_prefix,
|
|
**url_params
|
|
)
|
|
|
|
|
|
# Étant donné un queryset d'operations
|
|
# rend la somme des article_nb
|
|
def tot_ventes(queryset):
|
|
res = queryset.aggregate(Sum("article_nb"))["article_nb__sum"]
|
|
return res and res or 0
|
|
|
|
|
|
class ScaleMixin(object):
|
|
scale_args_prefix = "scale_"
|
|
|
|
def get_scale_args(self, params=None, prefix=None):
|
|
"""Retrieve scale args from params.
|
|
|
|
Should search the same args of Scale constructor.
|
|
|
|
Args:
|
|
params (dict, optional): Scale args are searched in this.
|
|
Default to GET params of request.
|
|
prefix (str, optional): Appended at the begin of scale args names.
|
|
Default to `self.scale_args_prefix`.
|
|
|
|
"""
|
|
if params is None:
|
|
params = self.request.GET
|
|
if prefix is None:
|
|
prefix = self.scale_args_prefix
|
|
|
|
scale_args = {}
|
|
|
|
name = params.get(prefix + "name", None)
|
|
if name is not None:
|
|
scale_args["name"] = name
|
|
|
|
n_steps = params.get(prefix + "n_steps", None)
|
|
if n_steps is not None:
|
|
scale_args["n_steps"] = int(n_steps)
|
|
|
|
begin = params.get(prefix + "begin", None)
|
|
if begin is not None:
|
|
scale_args["begin"] = dateutil_parse(begin)
|
|
|
|
end = params.get(prefix + "send", None)
|
|
if end is not None:
|
|
scale_args["end"] = dateutil_parse(end)
|
|
|
|
last = params.get(prefix + "last", None)
|
|
if last is not None:
|
|
scale_args["last"] = last in ["true", "True", "1"] and True or False
|
|
|
|
return scale_args
|
|
|
|
def get_context_data(self, *args, **kwargs):
|
|
context = super().get_context_data(*args, **kwargs)
|
|
|
|
scale_args = self.get_scale_args()
|
|
scale_name = scale_args.pop("name", None)
|
|
scale_cls = Scale.by_name(scale_name)
|
|
|
|
if scale_cls is None:
|
|
scale = self.get_default_scale()
|
|
else:
|
|
scale = scale_cls(**scale_args)
|
|
|
|
self.scale = scale
|
|
context["labels"] = scale.get_labels()
|
|
return context
|
|
|
|
def get_default_scale(self):
|
|
return DayScale(n_steps=7, last=True)
|