kpsul/kfet/statistic.py

328 lines
9.8 KiB
Python
Raw Normal View History

2016-12-09 21:45:34 +01:00
# -*- coding: utf-8 -*-
from datetime import date, datetime, time, timedelta
from dateutil.relativedelta import relativedelta
from dateutil.parser import parse as dateutil_parse
import pytz
2016-12-09 21:45:34 +01:00
from django.utils import timezone
2017-02-15 14:21:00 +01:00
from django.db.models import Sum
KFET_WAKES_UP_AT = time(7, 0)
def kfet_day(year, month, day, start_at=KFET_WAKES_UP_AT):
"""datetime wrapper with time offset."""
naive = datetime.combine(date(year, month, day), start_at)
return pytz.timezone('Europe/Paris').localize(naive, is_dst=None)
def to_kfet_day(dt, start_at=KFET_WAKES_UP_AT):
kfet_dt = kfet_day(year=dt.year, month=dt.month, day=dt.day)
if dt.time() < start_at:
kfet_dt -= timedelta(days=1)
return kfet_dt
class Scale(object):
name = None
step = None
def __init__(self, n_steps=0, begin=None, end=None,
last=False, std_chunk=True):
self.std_chunk = std_chunk
if last:
end = timezone.now()
if std_chunk:
if begin is not None:
begin = self.get_chunk_start(begin)
if end is not None:
end = self.do_step(self.get_chunk_start(end))
if begin is not None and n_steps != 0:
self.begin = begin
self.end = self.do_step(self.begin, n_steps=n_steps)
elif end is not None and n_steps != 0:
self.end = end
self.begin = self.do_step(self.end, n_steps=-n_steps)
elif begin is not None and end is not None:
self.begin = begin
self.end = end
else:
raise Exception('Two of these args must be specified: '
'n_steps, begin, end; '
'or use last and n_steps')
self.datetimes = self.get_datetimes()
@staticmethod
def by_name(name):
for cls in Scale.__subclasses__():
if cls.name == name:
return cls
return None
def get_from(self, dt):
return self.std_chunk and self.get_chunk_start(dt) or dt
def __getitem__(self, i):
return self.datetimes[i], self.datetimes[i+1]
def __len__(self):
return len(self.datetimes) - 1
def do_step(self, dt, n_steps=1):
return dt + self.step * n_steps
def get_datetimes(self):
datetimes = [self.begin]
tmp = self.begin
while tmp < self.end:
tmp = self.do_step(tmp)
datetimes.append(tmp)
return datetimes
def get_labels(self, label_fmt=None):
if label_fmt is None:
label_fmt = self.label_fmt
return [
begin.strftime(label_fmt.format(i=i, rev_i=len(self)-i))
for i, (begin, end) in enumerate(self)
]
def chunkify_qs(self, qs, field=None):
if field is None:
field = 'at'
begin_f = '{}__gte'.format(field)
end_f = '{}__lte'.format(field)
return [
qs.filter(**{begin_f: begin, end_f: end})
for begin, end in self
]
def get_by_chunks(self, qs, field_callback=None, field_db='at'):
"""Objects of queryset ranked according to the scale.
Returns a generator whose each item, corresponding to a scale chunk,
is a generator of objects from qs for this chunk.
Args:
qs: Queryset of source objects, must be ordered *first* on the
same field returned by `field_callback`.
field_callback: Callable which gives value from an object used
to compare against limits of the scale chunks.
Default to: lambda obj: getattr(obj, field_db)
field_db: Used to filter against `scale` limits.
Default to 'at'.
Examples:
If queryset `qs` use `values()`, `field_callback` must be set and
could be: `lambda d: d['at']`
If `field_db` use foreign attributes (eg with `__`), it should be
something like: `lambda obj: obj.group.at`.
"""
if field_callback is None:
def field_callback(obj):
return getattr(obj, field_db)
begin_f = '{}__gte'.format(field_db)
end_f = '{}__lte'.format(field_db)
qs = (
qs
.filter(**{begin_f: self.begin, end_f: self.end})
)
obj_iter = iter(qs)
last_obj = None
def _objects_until(obj_iter, field_callback, end):
"""Generator of objects until `end`.
Ends if objects source is empty or when an object not verifying
field_callback(obj) <= end is met.
If this object exists, it is stored in `last_obj` which is found
from outer scope.
Also, if this same variable is non-empty when the function is
called, it first yields its content.
Args:
obj_iter: Source used to get objects.
field_callback: Returned value, when it is called on an object
will be used to test ordering against `end`.
end
"""
nonlocal last_obj
if last_obj is not None:
yield last_obj
last_obj = None
for obj in obj_iter:
if field_callback(obj) <= end:
yield obj
else:
last_obj = obj
return
for begin, end in self:
# forward last seen object, if it exists, to the right chunk,
# and fill with empty generators for intermediate chunks of scale
if last_obj is not None:
if field_callback(last_obj) > end:
yield iter(())
continue
# yields generator for this chunk
# this set last_obj to None if obj_iter reach its end, otherwise
# it's set to the first met object from obj_iter which doesn't
# belong to this chunk
yield _objects_until(obj_iter, field_callback, end)
class DayScale(Scale):
name = 'day'
step = timedelta(days=1)
label_fmt = '%A'
@classmethod
def get_chunk_start(cls, dt):
return to_kfet_day(dt)
class WeekScale(Scale):
name = 'week'
step = timedelta(days=7)
label_fmt = 'Semaine %W'
@classmethod
2017-04-03 15:10:53 +02:00
def get_chunk_start(cls, dt):
dt_kfet = to_kfet_day(dt)
offset = timedelta(days=dt_kfet.weekday())
2017-04-03 15:10:53 +02:00
return dt_kfet - offset
class MonthScale(Scale):
name = 'month'
step = relativedelta(months=1)
label_fmt = '%B'
@classmethod
def get_chunk_start(cls, dt):
return to_kfet_day(dt).replace(day=1)
2016-12-09 21:45:34 +01:00
def stat_manifest(scales_def=None, scale_args=None, scale_prefix=None,
**other_url_params):
if scale_prefix is None:
scale_prefix = 'scale_'
if scales_def is None:
scales_def = []
if scale_args is None:
scale_args = {}
manifest = []
for label, cls in scales_def:
url_params = {scale_prefix+'name': cls.name}
url_params.update({scale_prefix+key: value
for key, value in scale_args.items()})
url_params.update(other_url_params)
manifest.append(dict(
label=label,
url_params=url_params,
))
return manifest
def last_stats_manifest(scales_def=None, scale_args=None, scale_prefix=None,
**url_params):
scales_def = [
('Derniers mois', MonthScale, ),
('Dernières semaines', WeekScale, ),
('Derniers jours', DayScale, ),
]
if scale_args is None:
scale_args = {}
scale_args.update(dict(
last=True,
n_steps=7,
))
return stat_manifest(scales_def=scales_def, scale_args=scale_args,
scale_prefix=scale_prefix, **url_params)
2016-12-09 21:45:34 +01:00
# Étant donné un queryset d'operations
# rend la somme des article_nb
def tot_ventes(queryset):
2017-02-15 14:21:00 +01:00
res = queryset.aggregate(Sum('article_nb'))['article_nb__sum']
return res and res or 0
class ScaleMixin(object):
scale_args_prefix = 'scale_'
def get_scale_args(self, params=None, prefix=None):
"""Retrieve scale args from params.
Should search the same args of Scale constructor.
Args:
params (dict, optional): Scale args are searched in this.
Default to GET params of request.
prefix (str, optional): Appended at the begin of scale args names.
Default to `self.scale_args_prefix`.
"""
if params is None:
params = self.request.GET
if prefix is None:
prefix = self.scale_args_prefix
scale_args = {}
name = params.get(prefix+'name', None)
if name is not None:
scale_args['name'] = name
n_steps = params.get(prefix+'n_steps', None)
if n_steps is not None:
scale_args['n_steps'] = int(n_steps)
begin = params.get(prefix+'begin', None)
if begin is not None:
scale_args['begin'] = dateutil_parse(begin)
end = params.get(prefix+'send', None)
if end is not None:
scale_args['end'] = dateutil_parse(end)
last = params.get(prefix+'last', None)
if last is not None:
scale_args['last'] = (
last in ['true', 'True', '1'] and True or False)
return scale_args
def get_context_data(self, *args, **kwargs):
context = super().get_context_data(*args, **kwargs)
scale_args = self.get_scale_args()
scale_name = scale_args.pop('name', None)
scale_cls = Scale.by_name(scale_name)
if scale_cls is None:
scale = self.get_default_scale()
else:
scale = scale_cls(**scale_args)
self.scale = scale
context['labels'] = scale.get_labels()
return context
def get_default_scale(self):
return DayScale(n_steps=7, last=True)