import datetime import random from decimal import Decimal import msgpack from channels_redis.core import RedisChannelLayer def encode_kf(obj): if isinstance(obj, Decimal): return {"__decimal__": True, "as_str": str(obj)} elif isinstance(obj, datetime.datetime): return {"__datetime__": True, "as_str": obj.strftime("%Y%m%dT%H:%M:%S.%f")} return obj def decode_kf(obj): if "__decimal__" in obj: obj = Decimal(obj["as_str"]) elif "__datetime__" in obj: obj = datetime.datetime.strptime(obj["as_str"], "%Y%m%dT%H:%M:%S.%f") return obj class ChannelLayer(RedisChannelLayer): def serialize(self, message): """Serializes to a byte string.""" value = msgpack.packb(message, default=encode_kf, use_bin_type=True) if self.crypter: value = self.crypter.encrypt(value) # As we use an sorted set to expire messages # we need to guarantee uniqueness, with 12 bytes. random_prefix = random.getrandbits(8 * 12).to_bytes(12, "big") return random_prefix + value def deserialize(self, message): """Deserializes from a byte string.""" # Removes the random prefix message = message[12:] if self.crypter: message = self.crypter.decrypt(message, self.expiry + 10) return msgpack.unpackb(message, object_hook=decode_kf, raw=False)