45 lines
1.4 KiB
Python
45 lines
1.4 KiB
Python
import datetime
|
|
import random
|
|
from decimal import Decimal
|
|
|
|
import msgpack
|
|
from channels_redis.core import RedisChannelLayer
|
|
|
|
|
|
def encode_kf(obj):
|
|
if isinstance(obj, Decimal):
|
|
return {"__decimal__": True, "as_str": str(obj)}
|
|
elif isinstance(obj, datetime.datetime):
|
|
return {"__datetime__": True, "as_str": obj.strftime("%Y%m%dT%H:%M:%S.%f")}
|
|
return obj
|
|
|
|
|
|
def decode_kf(obj):
|
|
if "__decimal__" in obj:
|
|
obj = Decimal(obj["as_str"])
|
|
elif "__datetime__" in obj:
|
|
obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f")
|
|
return obj
|
|
|
|
|
|
class ChannelLayer(RedisChannelLayer):
|
|
def serialize(self, message):
|
|
"""Serializes to a byte string."""
|
|
value = msgpack.packb(message, default=encode_kf, use_bin_type=True)
|
|
|
|
if self.crypter:
|
|
value = self.crypter.encrypt(value)
|
|
|
|
# As we use an sorted set to expire messages
|
|
# we need to guarantee uniqueness, with 12 bytes.
|
|
random_prefix = random.getrandbits(8 * 12).to_bytes(12, "big")
|
|
return random_prefix + value
|
|
|
|
def deserialize(self, message):
|
|
"""Deserializes from a byte string."""
|
|
# Removes the random prefix
|
|
message = message[12:]
|
|
|
|
if self.crypter:
|
|
message = self.crypter.decrypt(message, self.expiry + 10)
|
|
return msgpack.unpackb(message, object_hook=decode_kf, raw=False)
|