kpsul/kfet/utils.py

102 lines
2.7 KiB
Python

import json
import math
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from django.core.cache import cache
from django.core.serializers.json import DjangoJSONEncoder
from .config import kfet_config
def to_ukf(balance, is_cof=False):
"""Convert euro to UKF."""
subvention = kfet_config.subvention_cof
grant = (1 + subvention / 100) if is_cof else 1
return math.floor(balance * 10 * grant)
# Storage
class CachedMixin:
"""Object with cached properties.
Attributes:
cached (dict): Keys are cached properties. Associated value is the
returned default by getters in case the key is missing from cache.
cache_prefix (str): Used to prefix keys in cache.
"""
cached = {}
cache_prefix = ""
def __init__(self, cache_prefix=None, *args, **kwargs):
super().__init__(*args, **kwargs)
if cache_prefix is not None:
self.cache_prefix = cache_prefix
def cachekey(self, attr):
return "{}__{}".format(self.cache_prefix, attr)
def __getattr__(self, attr):
if attr in self.cached:
return cache.get(self.cachekey(attr), self.cached.get(attr))
elif hasattr(super(), "__getattr__"):
return super().__getattr__(attr)
else:
raise AttributeError("can't get attribute")
def __setattr__(self, attr, value):
if attr in self.cached:
cache.set(self.cachekey(attr), value)
elif hasattr(super(), "__setattr__"):
super().__setattr__(attr, value)
else:
raise AttributeError("can't set attribute")
def clear_cache(self):
cache.delete_many([self.cachekey(attr) for attr in self.cached.keys()])
# Consumers
class DjangoJsonWebsocketConsumer(AsyncJsonWebsocketConsumer):
"""Custom Json Websocket Consumer.
Encode to JSON with DjangoJSONEncoder.
"""
@classmethod
async def encode_json(cls, content):
# Remove the type value, only used by Channels to choose the group to send to
content.pop("type")
return json.dumps(content, cls=DjangoJSONEncoder)
class PermConsumerMixin:
"""Add support to check permissions on consumers.
Attributes:
perms_connect (list): Required permissions to connect to this
consumer.
message.user is appended as argument to each connection_groups method call.
"""
http_user = True # Enable message.user
perms_connect = []
async def connect(self):
"""Check permissions on connection."""
self.user = self.scope["user"]
if self.user.has_perms(self.perms_connect):
await super().connect()
else:
await self.close()