206 lines
6.7 KiB
Python
206 lines
6.7 KiB
Python
|
import logging
|
||
|
from enum import Enum
|
||
|
from typing import Optional
|
||
|
|
||
|
import serial
|
||
|
|
||
|
|
||
|
class MsgTypes(Enum):
|
||
|
"""
|
||
|
Gadget message types
|
||
|
"""
|
||
|
|
||
|
REPROGRAM_FIRMWARE = b"\x01"
|
||
|
FLASH_FIRMWARE_PAGE = b"\x02"
|
||
|
GET_WIDGET_PARAMS = b"\x03"
|
||
|
SET_WIDGET_PARAMS = b"\x04"
|
||
|
RECEIVED_DMX = b"\x05"
|
||
|
SEND_DMX = b"\x06"
|
||
|
RECEIVE_DMX_ON_CHANGE = b"\x08"
|
||
|
RECEIVE_DMX_CHANGE_OF_STATE = b"\x09"
|
||
|
GET_SERIAL_NUMBER = b"\x0a"
|
||
|
|
||
|
|
||
|
def from_bytes(b: bytes) -> int:
|
||
|
"""
|
||
|
Convert bytes to int (endianness compatible with the Gadget)
|
||
|
"""
|
||
|
return int.from_bytes(b, "little")
|
||
|
|
||
|
|
||
|
def to_bytes(b: int, length: int = 1) -> bytes:
|
||
|
"""
|
||
|
Convert int to bytes (endianness compatible with the Gadget)
|
||
|
"""
|
||
|
return b.to_bytes(length, "little")
|
||
|
|
||
|
|
||
|
class Widget:
|
||
|
"""
|
||
|
Class managing the communication with the gadget
|
||
|
"""
|
||
|
|
||
|
def __init__(self, serial_device_path: Optional[str] = None) -> None:
|
||
|
self._dmx = bytes(512)
|
||
|
self.break_time: Optional[int] = None
|
||
|
self.mab_time: Optional[int] = None
|
||
|
self.rate: Optional[int] = None
|
||
|
self.firmware: tuple[Optional[int], Optional[int]] = (None, None)
|
||
|
self._auto_commit = True
|
||
|
|
||
|
self.dmx_staging = bytearray(512)
|
||
|
self.serial_device_path = serial_device_path
|
||
|
self._serial = serial.Serial(self.serial_device_path, 115200)
|
||
|
if self._serial.is_open:
|
||
|
self._init_widget()
|
||
|
|
||
|
@property
|
||
|
def auto_commit(self) -> bool:
|
||
|
"""
|
||
|
Get `auto_commit` value, `True` if auto commit is enabled
|
||
|
"""
|
||
|
return self._auto_commit
|
||
|
|
||
|
@auto_commit.setter
|
||
|
def auto_commit(self, value: bool):
|
||
|
"""
|
||
|
Set `auto_commit` value
|
||
|
"""
|
||
|
self._auto_commit = value
|
||
|
if self._auto_commit:
|
||
|
self.commit()
|
||
|
|
||
|
@property
|
||
|
def is_open(self) -> bool:
|
||
|
return self._serial.is_open
|
||
|
|
||
|
def __enter__(self):
|
||
|
self._serial = serial.Serial(self.serial_device_path, 115200)
|
||
|
self._serial = self._serial.__enter__()
|
||
|
if self._serial.is_open:
|
||
|
self._init_widget()
|
||
|
return self
|
||
|
|
||
|
def _init_widget(self):
|
||
|
# Retrieve widget parameters
|
||
|
self.send_message(MsgTypes.GET_WIDGET_PARAMS, b"\x00\x00")
|
||
|
msg_type, data, _ = None, None, None
|
||
|
while msg_type != MsgTypes.GET_WIDGET_PARAMS.value:
|
||
|
msg_type, data, _ = self.read_message()
|
||
|
assert msg_type is not None and data is not None
|
||
|
|
||
|
self.break_time = data[2]
|
||
|
self.mab_time = data[3]
|
||
|
self.rate = data[4]
|
||
|
self.firmware = (data[1], data[0])
|
||
|
self.send_message(MsgTypes.SEND_DMX, b"\x00" + self._dmx)
|
||
|
logging.info("[WIDGET] Initialized widget.")
|
||
|
logging.info(f"[WIDGET] Break time: {self.break_time * 10.67}µs")
|
||
|
logging.info(f"[WIDGET] MAB time: {self.mab_time * 10.67}µs")
|
||
|
logging.info(f"[WIDGET] DMX rate: {self.rate}Hz")
|
||
|
logging.info(f"[WIDGET] Firmware v{self.firmware[0]}.{self.firmware[1]}")
|
||
|
|
||
|
def __exit__(self, exc_type, exc_value, traceback):
|
||
|
self._serial.__exit__(exc_type, exc_value, traceback)
|
||
|
|
||
|
def set_params(
|
||
|
self,
|
||
|
break_time: Optional[int] = None,
|
||
|
mab_time: Optional[int] = None,
|
||
|
rate: Optional[int] = None,
|
||
|
) -> None:
|
||
|
"""
|
||
|
Set widget params
|
||
|
"""
|
||
|
if break_time is not None:
|
||
|
if break_time >= 128 or break_time < 9:
|
||
|
raise ValueError(
|
||
|
f"Break time must be in [9,128) range, provided {break_time}"
|
||
|
)
|
||
|
self.break_time = break_time
|
||
|
if mab_time is not None:
|
||
|
if mab_time >= 128 or mab_time < 1:
|
||
|
raise ValueError(
|
||
|
f"MAB time must be in [1,128) range, provided {mab_time}"
|
||
|
)
|
||
|
self.mab_time = mab_time
|
||
|
if rate is not None:
|
||
|
if rate >= 41 or rate < 0:
|
||
|
raise ValueError(f"DMX rate must be in [0,41) range, provided {rate}")
|
||
|
self.rate = rate
|
||
|
assert (
|
||
|
self.break_time is not None
|
||
|
and self.mab_time is not None
|
||
|
and self.rate is not None
|
||
|
)
|
||
|
data = (
|
||
|
b"\x00\x00"
|
||
|
+ to_bytes(self.break_time)
|
||
|
+ to_bytes(self.mab_time)
|
||
|
+ to_bytes(self.rate)
|
||
|
)
|
||
|
self.send_message(MsgTypes.SET_WIDGET_PARAMS, data)
|
||
|
|
||
|
def set_dmx(
|
||
|
self,
|
||
|
addr: int,
|
||
|
data: bytes,
|
||
|
addr_starting: int = 0,
|
||
|
) -> None:
|
||
|
"""
|
||
|
Set dmx values that will be sent.
|
||
|
|
||
|
If `auto_commit` is set to false, to provided configuration won't be
|
||
|
sent but memorized until the next call to `Widget.commit()` (or a call
|
||
|
to this function with `auto_commit = True`).
|
||
|
"""
|
||
|
true_addr = addr - addr_starting
|
||
|
if len(data) + true_addr > 512:
|
||
|
raise ValueError("Can't send more than 512 value over dmx")
|
||
|
self.dmx_staging[true_addr : true_addr + len(data)] = data
|
||
|
if self.auto_commit:
|
||
|
self.commit()
|
||
|
|
||
|
def commit(self) -> None:
|
||
|
"""Commit the staging DMX data and send it"""
|
||
|
self._dmx = bytes(self.dmx_staging)
|
||
|
self.send_message(MsgTypes.SEND_DMX, b"\x00" + self._dmx)
|
||
|
|
||
|
def send_message(self, msg_type: MsgTypes, data: bytes) -> None:
|
||
|
"""
|
||
|
Lower level API for sending instruction to widget.
|
||
|
Prefer the use of `Widget.set_dmx` or `Widget.set_params`.
|
||
|
"""
|
||
|
if msg_type not in MsgTypes:
|
||
|
raise KeyError("Message type not known")
|
||
|
datalen = to_bytes(len(data), 2)
|
||
|
if not self._serial.is_open:
|
||
|
raise ValueError("Serial port is closed")
|
||
|
message = b"\x7E" + msg_type.value + datalen + data + b"\xE7"
|
||
|
logging.debug(f"[SERIAL] Sending {message}")
|
||
|
self._serial.write(message)
|
||
|
# self._serial.flush()
|
||
|
|
||
|
def read_message(self) -> tuple[bytes, bytes, bool]:
|
||
|
"""
|
||
|
Low level API for reading message sent by the widget.
|
||
|
"""
|
||
|
current_byte = b"\x00"
|
||
|
# Flush until start byte
|
||
|
while current_byte != b"\x7E":
|
||
|
current_byte = self._serial.read(1)
|
||
|
logging.debug(f"[SERIAL] Read {current_byte}")
|
||
|
msg_type = self._serial.read(1)
|
||
|
logging.debug(f"[SERIAL] Read {msg_type}")
|
||
|
data_len_raw = self._serial.read(2)
|
||
|
logging.debug(f"[SERIAL] Read {data_len_raw}")
|
||
|
data_len = from_bytes(data_len_raw)
|
||
|
data = self._serial.read(data_len)
|
||
|
logging.debug(f"[SERIAL] Read {data}")
|
||
|
end_byte = self._serial.read(1)
|
||
|
logging.debug(f"[SERIAL] Read {end_byte}")
|
||
|
is_correct = end_byte == b"\xE7"
|
||
|
if not is_correct:
|
||
|
logging.warning("[SERIAL] Invalid message received")
|
||
|
return msg_type, data, is_correct
|