kpsul/shared/channels.py

45 lines
1.4 KiB
Python

import datetime
import random
from decimal import Decimal
import msgpack
from channels_redis.core import RedisChannelLayer
def encode_kf(obj):
if isinstance(obj, Decimal):
return {"__decimal__": True, "as_str": str(obj)}
elif isinstance(obj, datetime.datetime):
return {"__datetime__": True, "as_str": obj.strftime("%Y%m%dT%H:%M:%S.%f")}
return obj
def decode_kf(obj):
if "__decimal__" in obj:
obj = Decimal(obj["as_str"])
elif "__datetime__" in obj:
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
return obj
class ChannelLayer(RedisChannelLayer):
def serialize(self, message):
"""Serializes to a byte string."""
value = msgpack.packb(message, default=encode_kf, use_bin_type=True)
if self.crypter:
value = self.crypter.encrypt(value)
# As we use an sorted set to expire messages
# we need to guarantee uniqueness, with 12 bytes.
random_prefix = random.getrandbits(8 * 12).to_bytes(12, "big")
return random_prefix + value
def deserialize(self, message):
"""Deserializes from a byte string."""
# Removes the random prefix
message = message[12:]
if self.crypter:
message = self.crypter.decrypt(message, self.expiry + 10)
return msgpack.unpackb(message, object_hook=decode_kf, raw=False)