From 9f14d6b46f6adb27dc409af78c23af71a631853f Mon Sep 17 00:00:00 2001 From: Constantin Gierczak--Galle Date: Sat, 9 Dec 2023 16:31:17 +0100 Subject: [PATCH] Initial commit --- README.md | 75 ++++++++++++++++ default.nix | 2 + npins/default.nix | 47 ++++++++++ npins/sources.json | 11 +++ pyjecteur.nix | 19 ++++ pyjecteur/__init__.py | 3 + pyjecteur/dummy.py | 176 ++++++++++++++++++++++++++++++++++++ pyjecteur/fixtures.py | 109 ++++++++++++++++++++++ pyjecteur/lights.py | 135 ++++++++++++++++++++++++++++ pyjecteur/reactive.py | 174 +++++++++++++++++++++++++++++++++++ pyjecteur/widget.py | 205 ++++++++++++++++++++++++++++++++++++++++++ setup.py | 11 +++ shell.nix | 40 +++++++++ 13 files changed, 1007 insertions(+) create mode 100644 README.md create mode 100644 default.nix create mode 100644 npins/default.nix create mode 100644 npins/sources.json create mode 100644 pyjecteur.nix create mode 100644 pyjecteur/__init__.py create mode 100644 pyjecteur/dummy.py create mode 100644 pyjecteur/fixtures.py create mode 100644 pyjecteur/lights.py create mode 100644 pyjecteur/reactive.py create mode 100644 pyjecteur/widget.py create mode 100644 setup.py create mode 100644 shell.nix diff --git a/README.md b/README.md new file mode 100644 index 0000000..f78f0b3 --- /dev/null +++ b/README.md @@ -0,0 +1,75 @@ +# `pyjecteur` + +## Obtenir un nix-shell avec `pyjecteur` (équivalent d'un virtualenv mais avec nix) + +Installer nix (`curl --proto '=https' --tlsv1.2 -sSf -L https://install.determinate.systems/nix | sh -s -- install`) puis executer + +``` +nix-shell -A prod +``` + +## Documentation rapide + +### `pyjecteur.widget.Widget` + +Contient la logique d'interaction avec le module DMX-USB. Pour l'utiliser, il +suffit d'instancier l'objet puis d'appler la méthode `set_dmx`: +```python +def set_dmx(self, addr: int, data: bytes, addr_starting: int = 0) -> None: + """ + Set dmx values that will be sent. + + If `auto_commit` is set to false, to provided configuration won't be + sent but memorized until the next call to `Widget.commit()` (or a call + to this function with `auto_commit = True`). + """ +``` + +Pour des interactions à plus haut niveau, continuez de lire + +### `pyjecteur.lights.Universe` + +Objet représentant un Univers. + +```python +class Universe: + """Represents a DMX universe. + + Manages the adress space and responsibles for sending DMX to widget when + `Universe.update_dmx()` is called. + """ + + def __init__(self, widget): + """ + Initializes the Universe + + widget must be a class providing `widget.set_dmx(data, address)` + """ + + def register(self, light: "AbstractLight", address: int) -> None: + """ + Register a light at the specified address + """ + + def update_dmx(self, light: "AbstractLight", data: Union[bytearray, bytes]) -> None: + """ + Update the dmx data of the specified light + """ +``` + +### Interéagir avec un projecteur + +Globalement, il faut instancier l'objet représentant le projecteur puis +l'enregister sur son Univers (`Universe.register(light, address)`, ATTENTION +les addresses commencent à 0). + +Ensuite il y a deux façon de changer les paramètres: + + 1. Changer les attributs (`projo.pan = 100`) + 2. Changer les addresses directement avec la notation index (change la valeur + à l'addresse `addresse_projo + index`) + + +## Déclaration d'un nouveau type de projecteur + +S'inspirer du fichier `pyjecteur/fixtures.py` diff --git a/default.nix b/default.nix new file mode 100644 index 0000000..f759f25 --- /dev/null +++ b/default.nix @@ -0,0 +1,2 @@ +{ pkgs ? import (import ./npins).nixpkgs {} }: +pkgs.python310Packages.callPackage ./pyjecteur.nix {} diff --git a/npins/default.nix b/npins/default.nix new file mode 100644 index 0000000..4a7c372 --- /dev/null +++ b/npins/default.nix @@ -0,0 +1,47 @@ +# Generated by npins. Do not modify; will be overwritten regularly +let + data = builtins.fromJSON (builtins.readFile ./sources.json); + version = data.version; + + mkSource = spec: + assert spec ? type; let + path = + if spec.type == "Git" then mkGitSource spec + else if spec.type == "GitRelease" then mkGitSource spec + else if spec.type == "PyPi" then mkPyPiSource spec + else if spec.type == "Channel" then mkChannelSource spec + else builtins.throw "Unknown source type ${spec.type}"; + in + spec // { outPath = path; }; + + mkGitSource = { repository, revision, url ? null, hash, ... }: + assert repository ? type; + # At the moment, either it is a plain git repository (which has an url), or it is a GitHub/GitLab repository + # In the latter case, there we will always be an url to the tarball + if url != null then + (builtins.fetchTarball { + inherit url; + sha256 = hash; # FIXME: check nix version & use SRI hashes + }) + else assert repository.type == "Git"; builtins.fetchGit { + url = repository.url; + rev = revision; + # hash = hash; + }; + + mkPyPiSource = { url, hash, ... }: + builtins.fetchurl { + inherit url; + sha256 = hash; + }; + + mkChannelSource = { url, hash, ... }: + builtins.fetchTarball { + inherit url; + sha256 = hash; + }; +in +if version == 3 then + builtins.mapAttrs (_: mkSource) data.pins +else + throw "Unsupported format version ${toString version} in sources.json. Try running `npins upgrade`" diff --git a/npins/sources.json b/npins/sources.json new file mode 100644 index 0000000..bf9ad74 --- /dev/null +++ b/npins/sources.json @@ -0,0 +1,11 @@ +{ + "pins": { + "nixpkgs": { + "type": "Channel", + "name": "nixpkgs-unstable", + "url": "https://releases.nixos.org/nixpkgs/nixpkgs-23.11pre536306.12bdeb01ff9e/nixexprs.tar.xz", + "hash": "00az00b7h8g4h0n6kvrmjykk70mz91dd5v9fy1fbxzszisvgl9d3" + } + }, + "version": 3 +} \ No newline at end of file diff --git a/pyjecteur.nix b/pyjecteur.nix new file mode 100644 index 0000000..7bf88fd --- /dev/null +++ b/pyjecteur.nix @@ -0,0 +1,19 @@ +{ lib , pkgs, stdenv, setuptoolsBuildHook, buildPythonPackage, pythonPackages , fetchFromGitHub }: +let + attrs = { + name = "pyjecteur"; + version = "2.0"; + doCheck = false; + src = ./. ; + passthru = { + wheel = buildPythonPackage (attrs // { + name = "pyjecteur-py3.whl"; + installPhase = "mv dist/pyjecteur-2.0-py3-none-any.whl $out"; + dontFixup = true; + doInstallCheck = false; + }); + }; + propagatedBuildInputs = [ pythonPackages.pyserial pythonPackages.colour ]; + }; +in +buildPythonPackage attrs diff --git a/pyjecteur/__init__.py b/pyjecteur/__init__.py new file mode 100644 index 0000000..b747ebc --- /dev/null +++ b/pyjecteur/__init__.py @@ -0,0 +1,3 @@ +""" +pyjecteur is a python framework for programming lights in DMX +""" diff --git a/pyjecteur/dummy.py b/pyjecteur/dummy.py new file mode 100644 index 0000000..7a463ff --- /dev/null +++ b/pyjecteur/dummy.py @@ -0,0 +1,176 @@ +import logging +from enum import Enum +from typing import Optional + + +class MsgTypes(Enum): + """ + Gadget message types + """ + + REPROGRAM_FIRMWARE = b"\x01" + FLASH_FIRMWARE_PAGE = b"\x02" + GET_WIDGET_PARAMS = b"\x03" + SET_WIDGET_PARAMS = b"\x04" + RECEIVED_DMX = b"\x05" + SEND_DMX = b"\x06" + RECEIVE_DMX_ON_CHANGE = b"\x08" + RECEIVE_DMX_CHANGE_OF_STATE = b"\x09" + GET_SERIAL_NUMBER = b"\x0a" + + +def from_bytes(b: bytes) -> int: + """ + Convert bytes to int (endianness compatible with the Gadget) + """ + return int.from_bytes(b, "little") + + +def to_bytes(b: int, length: int = 1) -> bytes: + """ + Convert int to bytes (endianness compatible with the Gadget) + """ + return b.to_bytes(length, "little") + + +class Widget: + """ + Class managing the communication with the gadget + """ + + def __init__(self, serial_device_path: Optional[str] = None) -> None: + self._dmx = bytes(512) + self.break_time: Optional[int] = None + self.mab_time: Optional[int] = None + self.rate: Optional[int] = None + self.firmware: tuple[Optional[int], Optional[int]] = (None, None) + self._auto_commit = True + + self.dmx_staging = bytearray(512) + self.serial_device_path = serial_device_path + + @property + def auto_commit(self) -> bool: + """ + Get `auto_commit` value, `True` if auto commit is enabled + """ + return self._auto_commit + + @auto_commit.setter + def auto_commit(self, value: bool): + """ + Set `auto_commit` value + """ + self._auto_commit = value + if self._auto_commit: + self.commit() + + @property + def is_open(self) -> bool: + return True + + def __enter__(self): + return self + + def _init_widget(self): + # Retrieve widget parameters + self.send_message(MsgTypes.GET_WIDGET_PARAMS, b"\x00\x00") + msg_type, data, _ = None, None, None + while msg_type != MsgTypes.GET_WIDGET_PARAMS.value: + msg_type, data, _ = self.read_message() + assert msg_type is not None and data is not None + + self.break_time = data[2] + self.mab_time = data[3] + self.rate = data[4] + self.firmware = (data[1], data[0]) + self.send_message(MsgTypes.SEND_DMX, b"\x00" + self._dmx) + logging.info("[WIDGET] Initialized widget.") + logging.info(f"[WIDGET] Break time: {self.break_time * 10.67}µs") + logging.info(f"[WIDGET] MAB time: {self.mab_time * 10.67}µs") + logging.info(f"[WIDGET] DMX rate: {self.rate}Hz") + logging.info(f"[WIDGET] Firmware v{self.firmware[0]}.{self.firmware[1]}") + + def __exit__(self, exc_type, exc_value, traceback): + pass + + def set_params( + self, + break_time: Optional[int] = None, + mab_time: Optional[int] = None, + rate: Optional[int] = None, + ) -> None: + """ + Set widget params + """ + if break_time is not None: + if break_time >= 128 or break_time < 9: + raise ValueError( + f"Break time must be in [9,128) range, provided {break_time}" + ) + self.break_time = break_time + if mab_time is not None: + if mab_time >= 128 or mab_time < 1: + raise ValueError( + f"MAB time must be in [1,128) range, provided {mab_time}" + ) + self.mab_time = mab_time + if rate is not None: + if rate >= 41 or rate < 0: + raise ValueError(f"DMX rate must be in [0,41) range, provided {rate}") + self.rate = rate + assert ( + self.break_time is not None + and self.mab_time is not None + and self.rate is not None + ) + data = ( + b"\x00\x00" + + to_bytes(self.break_time) + + to_bytes(self.mab_time) + + to_bytes(self.rate) + ) + self.send_message(MsgTypes.SET_WIDGET_PARAMS, data) + + def set_dmx( + self, + addr: int, + data: bytes, + addr_starting: int = 0, + ) -> None: + """ + Set dmx values that will be sent. + + If `auto_commit` is set to false, to provided configuration won't be + sent but memorized until the next call to `Widget.commit()` (or a call + to this function with `auto_commit = True`). + """ + true_addr = addr - addr_starting + if len(data) + true_addr > 512: + raise ValueError("Can't send more than 512 value over dmx") + self.dmx_staging[true_addr : true_addr + len(data)] = data + if self.auto_commit: + self.commit() + + def commit(self) -> None: + """Commit the staging DMX data and send it""" + self._dmx = bytes(self.dmx_staging) + self.send_message(MsgTypes.SEND_DMX, b"\x00" + self._dmx) + + def send_message(self, msg_type: MsgTypes, data: bytes) -> None: + """ + Lower level API for sending instruction to widget. + Prefer the use of `Widget.set_dmx` or `Widget.set_params`. + """ + if msg_type not in MsgTypes: + raise KeyError("Message type not known") + datalen = to_bytes(len(data), 2) + message = b"\x7E" + msg_type.value + datalen + data + b"\xE7" + logging.debug(f"[SERIAL] Sending {message}") + # self._serial.flush() + + def read_message(self) -> tuple[bytes, bytes, bool]: + """ + Low level API for reading message sent by the widget. + """ + raise NotImplementedError() diff --git a/pyjecteur/fixtures.py b/pyjecteur/fixtures.py new file mode 100644 index 0000000..206bfb8 --- /dev/null +++ b/pyjecteur/fixtures.py @@ -0,0 +1,109 @@ +"""""" +from colour import Color + +from .lights import AbstractLight +from .reactive import L, RBool, RInt, RList + + +class Strob(AbstractLight): + address_size = 2 + freq = RInt(0, 1) + dim = RInt(0, 0) + + +class StrobInv(AbstractLight): + address_size = 2 + freq = RInt(0, 0) + dim = RInt(0, 1) + + +class Wash(AbstractLight): + """ + Wash + """ + + address_size = 14 + + pan = RInt(0, 0) + tilt = RInt(0, 1) + speed = RInt(0, 2) + red = RInt(0, 3) + green = RInt(0, 4) + blue = RInt(0, 5) + white = RInt(0, 6) + dimmer = RInt(255, 9) + shutter = RBool(True, 10, true_val=b"\x15") + zoom = RInt(0, 11) + + +class Tradi(AbstractLight): + """ + Tradi RGB + """ + + address_size = 3 + + red = RInt(0, 0) + green = RInt(0, 1) + blue = RInt(0, 2) + + +class ParMiskin: + """ + Par 56 led + """ + + address_size = 8 + + red = RInt(0, 0) + green = RInt(0, 1) + blue = RInt(0, 2) + dimmer = RInt(255, 7) + + +class ParLed(AbstractLight): + """ + Par Led Theatre + """ + + address_size = 7 + + red = RInt(0, 0) + green = RInt(0, 1) + blue = RInt(0, 2) + + dimmer = RInt(255, 6) + + +class Blinder(AbstractLight): + """ + Blinder + """ + + address_size = 51 + + dimmer = RInt(255, 1) + flash = RInt(0, 2) + colors = RList( + [Color(rgb=(0, 0, 0)) for i in range(16)], + 3, + 3, + from_byte=lambda x: Color(f"#{x.hex()}"), + to_byte=lambda x: bytes.fromhex(x.hex_l[1:]), + ) + + +class LedBar48Ch(AbstractLight): + """ + Led Bar addressed on 48 channels + """ + + address_size = 48 + + colors = RList( + [Color(rgb=(0, 0, 0)) for i in range(16)], + 0, + 3, + from_byte=lambda x: Color(f"#{x.hex()}"), + to_byte=lambda x: bytes.fromhex(x.hex_l[1:]), + ) diff --git a/pyjecteur/lights.py b/pyjecteur/lights.py new file mode 100644 index 0000000..fef42e3 --- /dev/null +++ b/pyjecteur/lights.py @@ -0,0 +1,135 @@ +""" +Module providing class for handling fixtures and generating the appropriate DMX. +""" +from copy import deepcopy +from typing import Any, Callable, Optional, Union + +from .reactive import BaseReactiveValue, ReactiveMixin +from .widget import Widget + + +class Universe: + """Represents a DMX universe. + + Manages the adress space and responsibles for sending DMX to widget when + `Universe.update_dmx()` is called. + """ + + lights = {} + + def __init__(self, widget): + """ + Initializes the Universe + + widget must be a class providing `widget.set_dmx(data, address)` + """ + self.widget: Widget = widget + + def register(self, light: "AbstractLight", address: int) -> None: + """ + Register a light at the specified address + """ + # TODO: add checks for address overlapping + self.lights[light] = address + light.register_universe(self) + light.update_dmx() + + def update_dmx(self, light: "AbstractLight", data: Union[bytearray, bytes]) -> None: + """ + Update the dmx data of the specified light + """ + # TODO: add checks for length + self.widget.set_dmx(self.lights[light], data) + + +class AbstractLight: + """ + Abstract class for lights + """ + + address_size: int = 0 + + def __init__(self): + self._universe: Optional[Universe] = None + # The dmx values + self._dmx: bytes = bytearray(self.address_size) + # dmx memory_view to change in O(1) the values + self._dmx_mv = memoryview(self._dmx) + + # Dict holdin conversion functions for attr values to dmx: + # { attr_name => (address, length, converter function) } + self._attrs_to_dmx: dict[str, tuple[int, int, Callable[[Any], bytes]]] = {} + + # List holding conversion functions from dmx bytes to attrs. + # [ ( "attr_name", dmx_addr, length, converter function ) ] + self._dmx_to_attrs: list[ + Optional[tuple[str, int, int, Callable[[bytes], Any]]] + ] = [None for _ in range(self.address_size)] + + self._enable_auto_update: bool = False + + for key, rValueObject in self.__class__.__dict__.items(): + if isinstance(rValueObject, BaseReactiveValue): + # On copie la valeur + val = deepcopy(rValueObject.value) + if isinstance(val, ReactiveMixin): + val.light = self + val.key = key + self._attrs_to_dmx[key] = rValueObject.attr_to_dmx() + + for i, length, callback in rValueObject.dmx_to_attr(): + for k in range(i, i + length): + self._dmx_to_attrs[k] = (key, i, length, callback) + # Finally set the attributes to their value + setattr(self, key, val) + self._enable_auto_update: bool = True + + def register_universe(self, universe: "Universe") -> None: + """Assign a universe to this light""" + if self._universe is not None: + raise ValueError("Can't assign light to more than one universe") + self._universe = universe + + def update_dmx(self) -> None: + """ + Method to be called when the DMX values may have changed. + + This method sends DMX velues to the Universe. It is automatically + triggered by property assignments. + """ + if self._universe is not None and self._enable_auto_update: + self._universe.update_dmx(self, self._dmx) + + def __setattr__(self, name: str, value: Any) -> None: + """ + Automatically update dmx when a fixture param is set + """ + self.__dict__[name] = value + if not name.startswith("_"): + self.attr_set_hook(name, value) + + def attr_set_hook(self, name, value): + """ + Hook to be called when an attribute is set in order to update DMX + values + """ + if name in self._attrs_to_dmx: + # if the attr is linked to dmx, update self._dmx + position, length, converter = self._attrs_to_dmx[name] + self._dmx_mv[position : position + length] = converter(value) + self.update_dmx() + + def __getitem__(self, key) -> bytes: + return self._dmx[key] + + def __setitem__(self, key: int, value: bytes) -> None: + self._dmx_mv[key : key + 1] = value + + if self._dmx_to_attrs[key] is not None: + attr, position, length, converter = self._dmx_to_attrs[ + key + ] # pyright: ignore + self._enable_auto_update = False + setattr(self, attr, converter(self._dmx[position : position + length])) + self._enable_auto_update = True + self.update_dmx() diff --git a/pyjecteur/reactive.py b/pyjecteur/reactive.py new file mode 100644 index 0000000..c252633 --- /dev/null +++ b/pyjecteur/reactive.py @@ -0,0 +1,174 @@ +""" +Module holding classes for easy specification of fixtures attributes +""" + +from typing import Any, Callable, Iterable, Optional + +from .widget import from_bytes, to_bytes + + +class ReactiveMixin: + """ + Mixin for data types that need to update + """ + + light = None + key: Optional[str] = None + + +class BaseReactiveValue: + """ + Abstract class for defining the parsers between DMX values and object attributes + """ + + address: Optional[list[int]] = None + value: Any = None + + def attr_to_dmx(self) -> tuple[int, int, Callable[[Any], bytes]]: + """ + Fonction qui retourne les informations pour convertir une valeur + d'attribut en DMX sous la forme: + ``` + (addresse, longueur, fonction_de_conversion) + ``` + """ + raise NotImplementedError() + + def dmx_to_attr(self) -> Iterable[tuple[int, int, Callable[[bytes], Any]]]: + """ + Fonction qui retourne les informations pour mettre à jour les attributs + si le DMX a changé sous la forme + ``` + list((addresse, longueur, fonction_de_conversion)) + ``` + """ + raise NotImplementedError() + + +class RInt(BaseReactiveValue): + """ + Int light attribute + """ + + def __init__(self, value: int, address: int, length: int = 1): + self.value: int = value + self.address: int = address + self.length: int = length + + def dmx_to_attr(self): + return [(self.address, self.length, lambda _, b: from_bytes(b))] + + def attr_to_dmx(self): + return (self.address, self.length, lambda x: to_bytes(x, length=self.length)) + + +class RBool(BaseReactiveValue): + """ + Boolean light attribute + """ + + def __init__( # pylint: disable=too-many-arguments + self, + value, + address, + true_val: bytes = b"\xff", + false_val: bytes = b"\x00", + length=1, + ): + self.value = value + self.address = address + self.length = length + self.true_val = true_val + self.false_val = false_val + + def dmx_to_attr(self): + return [(self.address, self.length, lambda _, x: x == self.true_val)] + + def attr_to_dmx(self): + return ( + self.address, + self.length, + lambda x: self.true_val if x else self.false_val, + ) + + +class L(ReactiveMixin): # ruff: disable=invalid-name + """ + Thin wrapper around lists to handle reactivity inside lists + """ + + def __init__( + self, + val: list[Any], + ): + self._val = val + + def __len__(self): + return len(self._val) + + def __getitem__(self, key): + return self._val[key] + + def __setitem__(self, key, value): + self._val[key] = value + if self.light: + self.light.attr_set_hook(self.key, self) + + def __iter__(self): + return self._val.__iter__() + + +class RList(BaseReactiveValue): + """ + List light attribute + """ + + def __init__( # pylint: disable=too-many-arguments + self, + value: list[Any], + address: int, + unit_size: int = 1, + from_byte: Callable[[bytes], Any] = from_bytes, + to_byte: Callable[[Any], bytes] = to_bytes, + ): + """ + value: L([]) object + address: DMX address + unit_size: size of one value in the list (number of DMX slots it takes) + from_bytes, to_byte: convert function from list value to dmx bytes + (and reciprocally) + """ + self.value = L(value) + self.address: int = address + self.unit_size = unit_size + self.length = len(value) * unit_size + self.from_byte = from_byte + self.to_byte = to_byte + + def dmx_to_attr(self): + def parser_factory(i): + """ + Factory to create functions that updates i-th value of list + """ + + def parser(value, bytes_val): + value._val[i] = self.from_byte(bytes_val) + return value + + return parser + + return [ + ( + self.address + i * self.unit_size, + self.unit_size, + parser_factory(i), + ) + for i in range(len(self.value)) + ] + + def attr_to_dmx(self): + return ( + self.address, + self.length, + lambda x: b"".join([self.to_byte(i) for i in x]), + ) diff --git a/pyjecteur/widget.py b/pyjecteur/widget.py new file mode 100644 index 0000000..880eef7 --- /dev/null +++ b/pyjecteur/widget.py @@ -0,0 +1,205 @@ +import logging +from enum import Enum +from typing import Optional + +import serial + + +class MsgTypes(Enum): + """ + Gadget message types + """ + + REPROGRAM_FIRMWARE = b"\x01" + FLASH_FIRMWARE_PAGE = b"\x02" + GET_WIDGET_PARAMS = b"\x03" + SET_WIDGET_PARAMS = b"\x04" + RECEIVED_DMX = b"\x05" + SEND_DMX = b"\x06" + RECEIVE_DMX_ON_CHANGE = b"\x08" + RECEIVE_DMX_CHANGE_OF_STATE = b"\x09" + GET_SERIAL_NUMBER = b"\x0a" + + +def from_bytes(b: bytes) -> int: + """ + Convert bytes to int (endianness compatible with the Gadget) + """ + return int.from_bytes(b, "little") + + +def to_bytes(b: int, length: int = 1) -> bytes: + """ + Convert int to bytes (endianness compatible with the Gadget) + """ + return b.to_bytes(length, "little") + + +class Widget: + """ + Class managing the communication with the gadget + """ + + def __init__(self, serial_device_path: Optional[str] = None) -> None: + self._dmx = bytes(512) + self.break_time: Optional[int] = None + self.mab_time: Optional[int] = None + self.rate: Optional[int] = None + self.firmware: tuple[Optional[int], Optional[int]] = (None, None) + self._auto_commit = True + + self.dmx_staging = bytearray(512) + self.serial_device_path = serial_device_path + self._serial = serial.Serial(self.serial_device_path, 115200) + if self._serial.is_open: + self._init_widget() + + @property + def auto_commit(self) -> bool: + """ + Get `auto_commit` value, `True` if auto commit is enabled + """ + return self._auto_commit + + @auto_commit.setter + def auto_commit(self, value: bool): + """ + Set `auto_commit` value + """ + self._auto_commit = value + if self._auto_commit: + self.commit() + + @property + def is_open(self) -> bool: + return self._serial.is_open + + def __enter__(self): + self._serial = serial.Serial(self.serial_device_path, 115200) + self._serial = self._serial.__enter__() + if self._serial.is_open: + self._init_widget() + return self + + def _init_widget(self): + # Retrieve widget parameters + self.send_message(MsgTypes.GET_WIDGET_PARAMS, b"\x00\x00") + msg_type, data, _ = None, None, None + while msg_type != MsgTypes.GET_WIDGET_PARAMS.value: + msg_type, data, _ = self.read_message() + assert msg_type is not None and data is not None + + self.break_time = data[2] + self.mab_time = data[3] + self.rate = data[4] + self.firmware = (data[1], data[0]) + self.send_message(MsgTypes.SEND_DMX, b"\x00" + self._dmx) + logging.info("[WIDGET] Initialized widget.") + logging.info(f"[WIDGET] Break time: {self.break_time * 10.67}µs") + logging.info(f"[WIDGET] MAB time: {self.mab_time * 10.67}µs") + logging.info(f"[WIDGET] DMX rate: {self.rate}Hz") + logging.info(f"[WIDGET] Firmware v{self.firmware[0]}.{self.firmware[1]}") + + def __exit__(self, exc_type, exc_value, traceback): + self._serial.__exit__(exc_type, exc_value, traceback) + + def set_params( + self, + break_time: Optional[int] = None, + mab_time: Optional[int] = None, + rate: Optional[int] = None, + ) -> None: + """ + Set widget params + """ + if break_time is not None: + if break_time >= 128 or break_time < 9: + raise ValueError( + f"Break time must be in [9,128) range, provided {break_time}" + ) + self.break_time = break_time + if mab_time is not None: + if mab_time >= 128 or mab_time < 1: + raise ValueError( + f"MAB time must be in [1,128) range, provided {mab_time}" + ) + self.mab_time = mab_time + if rate is not None: + if rate >= 41 or rate < 0: + raise ValueError(f"DMX rate must be in [0,41) range, provided {rate}") + self.rate = rate + assert ( + self.break_time is not None + and self.mab_time is not None + and self.rate is not None + ) + data = ( + b"\x00\x00" + + to_bytes(self.break_time) + + to_bytes(self.mab_time) + + to_bytes(self.rate) + ) + self.send_message(MsgTypes.SET_WIDGET_PARAMS, data) + + def set_dmx( + self, + addr: int, + data: bytes, + addr_starting: int = 0, + ) -> None: + """ + Set dmx values that will be sent. + + If `auto_commit` is set to false, to provided configuration won't be + sent but memorized until the next call to `Widget.commit()` (or a call + to this function with `auto_commit = True`). + """ + true_addr = addr - addr_starting + if len(data) + true_addr > 512: + raise ValueError("Can't send more than 512 value over dmx") + self.dmx_staging[true_addr : true_addr + len(data)] = data + if self.auto_commit: + self.commit() + + def commit(self) -> None: + """Commit the staging DMX data and send it""" + self._dmx = bytes(self.dmx_staging) + self.send_message(MsgTypes.SEND_DMX, b"\x00" + self._dmx) + + def send_message(self, msg_type: MsgTypes, data: bytes) -> None: + """ + Lower level API for sending instruction to widget. + Prefer the use of `Widget.set_dmx` or `Widget.set_params`. + """ + if msg_type not in MsgTypes: + raise KeyError("Message type not known") + datalen = to_bytes(len(data), 2) + if not self._serial.is_open: + raise ValueError("Serial port is closed") + message = b"\x7E" + msg_type.value + datalen + data + b"\xE7" + logging.debug(f"[SERIAL] Sending {message}") + self._serial.write(message) + # self._serial.flush() + + def read_message(self) -> tuple[bytes, bytes, bool]: + """ + Low level API for reading message sent by the widget. + """ + current_byte = b"\x00" + # Flush until start byte + while current_byte != b"\x7E": + current_byte = self._serial.read(1) + logging.debug(f"[SERIAL] Read {current_byte}") + msg_type = self._serial.read(1) + logging.debug(f"[SERIAL] Read {msg_type}") + data_len_raw = self._serial.read(2) + logging.debug(f"[SERIAL] Read {data_len_raw}") + data_len = from_bytes(data_len_raw) + data = self._serial.read(data_len) + logging.debug(f"[SERIAL] Read {data}") + end_byte = self._serial.read(1) + logging.debug(f"[SERIAL] Read {end_byte}") + is_correct = end_byte == b"\xE7" + if not is_correct: + logging.warning("[SERIAL] Invalid message received") + return msg_type, data, is_correct diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..76ae9ae --- /dev/null +++ b/setup.py @@ -0,0 +1,11 @@ +from setuptools import setup + +setup( + name="pyjecteur", + version="2.0", + packages=["pyjecteur"], + install_requires=[ + "pyserial", + "colour", + ], +) diff --git a/shell.nix b/shell.nix new file mode 100644 index 0000000..c3efeff --- /dev/null +++ b/shell.nix @@ -0,0 +1,40 @@ +{ + nixpkgs ? (import ./npins).nixpkgs +, pkgs ? import nixpkgs {} +}: +let + python = pkgs.python310.override { + packageOverrides = self: super: { + pyjecteur = self.callPackage ./pyjecteur.nix {}; + }; + }; +in +{ + dev = pkgs.mkShell { + packages = [ + (pkgs.python310.withPackages (ps: [ + # build dep + ps.pyserial + ps.colour + + # dev dep + ps.black + ps.pylint + ps.ipython + ])) + pkgs.pyright + ]; + }; + prod = pkgs.mkShell { + packages = [ + (python.withPackages (ps: [ + ps.pyjecteur + + ps.ipython + ps.black + ps.pylint + ])) + pkgs.pyright + ]; + }; +}