from datetime import date, datetime, time, timedelta from dateutil.relativedelta import relativedelta from dateutil.parser import parse as dateutil_parse import pytz from django.utils import timezone from django.db.models import Sum KFET_WAKES_UP_AT = time(7, 0) def kfet_day(year, month, day, start_at=KFET_WAKES_UP_AT): """datetime wrapper with time offset.""" naive = datetime.combine(date(year, month, day), start_at) return pytz.timezone('Europe/Paris').localize(naive, is_dst=None) def to_kfet_day(dt, start_at=KFET_WAKES_UP_AT): kfet_dt = kfet_day(year=dt.year, month=dt.month, day=dt.day) if dt.time() < start_at: kfet_dt -= timedelta(days=1) return kfet_dt class Scale(object): name = None step = None def __init__(self, n_steps=0, begin=None, end=None, last=False, std_chunk=True): self.std_chunk = std_chunk if last: end = timezone.now() if std_chunk: if begin is not None: begin = self.get_chunk_start(begin) if end is not None: end = self.do_step(self.get_chunk_start(end)) if begin is not None and n_steps != 0: self.begin = begin self.end = self.do_step(self.begin, n_steps=n_steps) elif end is not None and n_steps != 0: self.end = end self.begin = self.do_step(self.end, n_steps=-n_steps) elif begin is not None and end is not None: self.begin = begin self.end = end else: raise Exception('Two of these args must be specified: ' 'n_steps, begin, end; ' 'or use last and n_steps') self.datetimes = self.get_datetimes() @staticmethod def by_name(name): for cls in Scale.__subclasses__(): if cls.name == name: return cls return None def get_from(self, dt): return self.std_chunk and self.get_chunk_start(dt) or dt def __getitem__(self, i): return self.datetimes[i], self.datetimes[i+1] def __len__(self): return len(self.datetimes) - 1 def do_step(self, dt, n_steps=1): return dt + self.step * n_steps def get_datetimes(self): datetimes = [self.begin] tmp = self.begin while tmp < self.end: tmp = self.do_step(tmp) datetimes.append(tmp) return datetimes def get_labels(self, label_fmt=None): if label_fmt is None: label_fmt = self.label_fmt return [ begin.strftime(label_fmt.format(i=i, rev_i=len(self)-i)) for i, (begin, end) in enumerate(self) ] def chunkify_qs(self, qs, field=None): if field is None: field = 'at' begin_f = '{}__gte'.format(field) end_f = '{}__lte'.format(field) return [ qs.filter(**{begin_f: begin, end_f: end}) for begin, end in self ] def get_by_chunks(self, qs, field_callback=None, field_db='at'): """Objects of queryset ranked according to the scale. Returns a generator whose each item, corresponding to a scale chunk, is a generator of objects from qs for this chunk. Args: qs: Queryset of source objects, must be ordered *first* on the same field returned by `field_callback`. field_callback: Callable which gives value from an object used to compare against limits of the scale chunks. Default to: lambda obj: getattr(obj, field_db) field_db: Used to filter against `scale` limits. Default to 'at'. Examples: If queryset `qs` use `values()`, `field_callback` must be set and could be: `lambda d: d['at']` If `field_db` use foreign attributes (eg with `__`), it should be something like: `lambda obj: obj.group.at`. """ if field_callback is None: def field_callback(obj): return getattr(obj, field_db) begin_f = '{}__gte'.format(field_db) end_f = '{}__lte'.format(field_db) qs = ( qs .filter(**{begin_f: self.begin, end_f: self.end}) ) obj_iter = iter(qs) last_obj = None def _objects_until(obj_iter, field_callback, end): """Generator of objects until `end`. Ends if objects source is empty or when an object not verifying field_callback(obj) <= end is met. If this object exists, it is stored in `last_obj` which is found from outer scope. Also, if this same variable is non-empty when the function is called, it first yields its content. Args: obj_iter: Source used to get objects. field_callback: Returned value, when it is called on an object will be used to test ordering against `end`. end """ nonlocal last_obj if last_obj is not None: yield last_obj last_obj = None for obj in obj_iter: if field_callback(obj) <= end: yield obj else: last_obj = obj return for begin, end in self: # forward last seen object, if it exists, to the right chunk, # and fill with empty generators for intermediate chunks of scale if last_obj is not None: if field_callback(last_obj) > end: yield iter(()) continue # yields generator for this chunk # this set last_obj to None if obj_iter reach its end, otherwise # it's set to the first met object from obj_iter which doesn't # belong to this chunk yield _objects_until(obj_iter, field_callback, end) class DayScale(Scale): name = 'day' step = timedelta(days=1) label_fmt = '%A' @classmethod def get_chunk_start(cls, dt): return to_kfet_day(dt) class WeekScale(Scale): name = 'week' step = timedelta(days=7) label_fmt = 'Semaine %W' @classmethod def get_chunk_start(cls, dt): dt_kfet = to_kfet_day(dt) offset = timedelta(days=dt_kfet.weekday()) return dt_kfet - offset class MonthScale(Scale): name = 'month' step = relativedelta(months=1) label_fmt = '%B' @classmethod def get_chunk_start(cls, dt): return to_kfet_day(dt).replace(day=1) def stat_manifest(scales_def=None, scale_args=None, scale_prefix=None, **other_url_params): if scale_prefix is None: scale_prefix = 'scale_' if scales_def is None: scales_def = [] if scale_args is None: scale_args = {} manifest = [] for label, cls in scales_def: url_params = {scale_prefix+'name': cls.name} url_params.update({scale_prefix+key: value for key, value in scale_args.items()}) url_params.update(other_url_params) manifest.append(dict( label=label, url_params=url_params, )) return manifest def last_stats_manifest(scales_def=None, scale_args=None, scale_prefix=None, **url_params): scales_def = [ ('Derniers mois', MonthScale, ), ('Dernières semaines', WeekScale, ), ('Derniers jours', DayScale, ), ] if scale_args is None: scale_args = {} scale_args.update(dict( last=True, n_steps=7, )) return stat_manifest(scales_def=scales_def, scale_args=scale_args, scale_prefix=scale_prefix, **url_params) # Étant donné un queryset d'operations # rend la somme des article_nb def tot_ventes(queryset): res = queryset.aggregate(Sum('article_nb'))['article_nb__sum'] return res and res or 0 class ScaleMixin(object): scale_args_prefix = 'scale_' def get_scale_args(self, params=None, prefix=None): """Retrieve scale args from params. Should search the same args of Scale constructor. Args: params (dict, optional): Scale args are searched in this. Default to GET params of request. prefix (str, optional): Appended at the begin of scale args names. Default to `self.scale_args_prefix`. """ if params is None: params = self.request.GET if prefix is None: prefix = self.scale_args_prefix scale_args = {} name = params.get(prefix+'name', None) if name is not None: scale_args['name'] = name n_steps = params.get(prefix+'n_steps', None) if n_steps is not None: scale_args['n_steps'] = int(n_steps) begin = params.get(prefix+'begin', None) if begin is not None: scale_args['begin'] = dateutil_parse(begin) end = params.get(prefix+'send', None) if end is not None: scale_args['end'] = dateutil_parse(end) last = params.get(prefix+'last', None) if last is not None: scale_args['last'] = ( last in ['true', 'True', '1'] and True or False) return scale_args def get_context_data(self, *args, **kwargs): context = super().get_context_data(*args, **kwargs) scale_args = self.get_scale_args() scale_name = scale_args.pop('name', None) scale_cls = Scale.by_name(scale_name) if scale_cls is None: scale = self.get_default_scale() else: scale = scale_cls(**scale_args) self.scale = scale context['labels'] = scale.get_labels() return context def get_default_scale(self): return DayScale(n_steps=7, last=True)