import json import math from channels.generic.websocket import AsyncJsonWebsocketConsumer from django.core.cache import cache from django.core.serializers.json import DjangoJSONEncoder from .config import kfet_config def to_ukf(balance, is_cof=False): """Convert euro to UKF.""" subvention = kfet_config.subvention_cof grant = (1 + subvention / 100) if is_cof else 1 return math.floor(balance * 10 * grant) # Storage class CachedMixin: """Object with cached properties. Attributes: cached (dict): Keys are cached properties. Associated value is the returned default by getters in case the key is missing from cache. cache_prefix (str): Used to prefix keys in cache. """ cached = {} cache_prefix = "" def __init__(self, cache_prefix=None, *args, **kwargs): super().__init__(*args, **kwargs) if cache_prefix is not None: self.cache_prefix = cache_prefix def cachekey(self, attr): return "{}__{}".format(self.cache_prefix, attr) def __getattr__(self, attr): if attr in self.cached: return cache.get(self.cachekey(attr), self.cached.get(attr)) elif hasattr(super(), "__getattr__"): return super().__getattr__(attr) else: raise AttributeError("can't get attribute") def __setattr__(self, attr, value): if attr in self.cached: cache.set(self.cachekey(attr), value) elif hasattr(super(), "__setattr__"): super().__setattr__(attr, value) else: raise AttributeError("can't set attribute") def clear_cache(self): cache.delete_many([self.cachekey(attr) for attr in self.cached.keys()]) # Consumers class DjangoJsonWebsocketConsumer(AsyncJsonWebsocketConsumer): """Custom Json Websocket Consumer. Encode to JSON with DjangoJSONEncoder. """ @classmethod async def encode_json(cls, content): return json.dumps(content, cls=DjangoJSONEncoder) class PermConsumerMixin: """Add support to check permissions on consumers. Attributes: perms_connect (list): Required permissions to connect to this consumer. message.user is appended as argument to each connection_groups method call. """ http_user = True # Enable message.user perms_connect = [] async def connect(self): """Check permissions on connection.""" self.user = self.scope["user"] if self.user.has_perms(self.perms_connect): await super().connect() else: await self.close() # async def raw_connect(self, message, **kwargs): # # Same as original raw_connect method of JsonWebsocketConsumer # # We add user to connection_groups call. # groups = self.connection_groups(user=message.user, **kwargs) # for group in groups: # await self.channel_layer.group_add(group, message.reply_channel) # # Group(group, channel_layer=message.channel_layer).add(message.reply_channel) # self.connect(message, **kwargs) # # async def raw_disconnect(self, message, **kwargs): # # Same as original raw_connect method of JsonWebsocketConsumer # # We add user to connection_groups call. # groups = self.connection_groups(user=message.user, **kwargs) # for group in groups: # await self.channel_layer.group_discard(group, message.reply_channel) # # Group(group, channel_layer=message.channel_layer).discard( # # message.reply_channel # # ) # self.disconnect(message, **kwargs) # # def connection_groups(self, user, **kwargs): # """`message.user` is available as `user` arg. Original behavior.""" # return super().connection_groups(user=user, **kwargs)