kpsul/kfet/utils.py

122 lines
3.8 KiB
Python

import json
import math
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from django.core.cache import cache
from django.core.serializers.json import DjangoJSONEncoder
from .config import kfet_config
def to_ukf(balance, is_cof=False):
"""Convert euro to UKF."""
subvention = kfet_config.subvention_cof
grant = (1 + subvention / 100) if is_cof else 1
return math.floor(balance * 10 * grant)
# Storage
class CachedMixin:
"""Object with cached properties.
Attributes:
cached (dict): Keys are cached properties. Associated value is the
returned default by getters in case the key is missing from cache.
cache_prefix (str): Used to prefix keys in cache.
"""
cached = {}
cache_prefix = ""
def __init__(self, cache_prefix=None, *args, **kwargs):
super().__init__(*args, **kwargs)
if cache_prefix is not None:
self.cache_prefix = cache_prefix
def cachekey(self, attr):
return "{}__{}".format(self.cache_prefix, attr)
def __getattr__(self, attr):
if attr in self.cached:
return cache.get(self.cachekey(attr), self.cached.get(attr))
elif hasattr(super(), "__getattr__"):
return super().__getattr__(attr)
else:
raise AttributeError("can't get attribute")
def __setattr__(self, attr, value):
if attr in self.cached:
cache.set(self.cachekey(attr), value)
elif hasattr(super(), "__setattr__"):
super().__setattr__(attr, value)
else:
raise AttributeError("can't set attribute")
def clear_cache(self):
cache.delete_many([self.cachekey(attr) for attr in self.cached.keys()])
# Consumers
class DjangoJsonWebsocketConsumer(AsyncJsonWebsocketConsumer):
"""Custom Json Websocket Consumer.
Encode to JSON with DjangoJSONEncoder.
"""
@classmethod
async def encode_json(cls, content):
return json.dumps(content, cls=DjangoJSONEncoder)
class PermConsumerMixin:
"""Add support to check permissions on consumers.
Attributes:
perms_connect (list): Required permissions to connect to this
consumer.
message.user is appended as argument to each connection_groups method call.
"""
http_user = True # Enable message.user
perms_connect = []
async def connect(self):
"""Check permissions on connection."""
self.user = self.scope["user"]
if self.user.has_perms(self.perms_connect):
await super().connect()
else:
await self.close()
# async def raw_connect(self, message, **kwargs):
# # Same as original raw_connect method of JsonWebsocketConsumer
# # We add user to connection_groups call.
# groups = self.connection_groups(user=message.user, **kwargs)
# for group in groups:
# await self.channel_layer.group_add(group, message.reply_channel)
# # Group(group, channel_layer=message.channel_layer).add(message.reply_channel)
# self.connect(message, **kwargs)
#
# async def raw_disconnect(self, message, **kwargs):
# # Same as original raw_connect method of JsonWebsocketConsumer
# # We add user to connection_groups call.
# groups = self.connection_groups(user=message.user, **kwargs)
# for group in groups:
# await self.channel_layer.group_discard(group, message.reply_channel)
# # Group(group, channel_layer=message.channel_layer).discard(
# # message.reply_channel
# # )
# self.disconnect(message, **kwargs)
#
# def connection_groups(self, user, **kwargs):
# """`message.user` is available as `user` arg. Original behavior."""
# return super().connection_groups(user=user, **kwargs)