Initial commit

This commit is contained in:
Constantin Gierczak--Galle 2023-12-09 16:31:17 +01:00
commit 9f14d6b46f
13 changed files with 1007 additions and 0 deletions

75
README.md Normal file
View file

@ -0,0 +1,75 @@
# `pyjecteur`
## Obtenir un nix-shell avec `pyjecteur` (équivalent d'un virtualenv mais avec nix)
Installer nix (`curl --proto '=https' --tlsv1.2 -sSf -L https://install.determinate.systems/nix | sh -s -- install`) puis executer
```
nix-shell -A prod
```
## Documentation rapide
### `pyjecteur.widget.Widget`
Contient la logique d'interaction avec le module DMX-USB. Pour l'utiliser, il
suffit d'instancier l'objet puis d'appler la méthode `set_dmx`:
```python
def set_dmx(self, addr: int, data: bytes, addr_starting: int = 0) -> None:
"""
Set dmx values that will be sent.
If `auto_commit` is set to false, to provided configuration won't be
sent but memorized until the next call to `Widget.commit()` (or a call
to this function with `auto_commit = True`).
"""
```
Pour des interactions à plus haut niveau, continuez de lire
### `pyjecteur.lights.Universe`
Objet représentant un Univers.
```python
class Universe:
"""Represents a DMX universe.
Manages the adress space and responsibles for sending DMX to widget when
`Universe.update_dmx()` is called.
"""
def __init__(self, widget):
"""
Initializes the Universe
widget must be a class providing `widget.set_dmx(data, address)`
"""
def register(self, light: "AbstractLight", address: int) -> None:
"""
Register a light at the specified address
"""
def update_dmx(self, light: "AbstractLight", data: Union[bytearray, bytes]) -> None:
"""
Update the dmx data of the specified light
"""
```
### Interéagir avec un projecteur
Globalement, il faut instancier l'objet représentant le projecteur puis
l'enregister sur son Univers (`Universe.register(light, address)`, ATTENTION
les addresses commencent à 0).
Ensuite il y a deux façon de changer les paramètres:
1. Changer les attributs (`projo.pan = 100`)
2. Changer les addresses directement avec la notation index (change la valeur
à l'addresse `addresse_projo + index`)
## Déclaration d'un nouveau type de projecteur
S'inspirer du fichier `pyjecteur/fixtures.py`

2
default.nix Normal file
View file

@ -0,0 +1,2 @@
{ pkgs ? import (import ./npins).nixpkgs {} }:
pkgs.python310Packages.callPackage ./pyjecteur.nix {}

47
npins/default.nix Normal file
View file

@ -0,0 +1,47 @@
# Generated by npins. Do not modify; will be overwritten regularly
let
data = builtins.fromJSON (builtins.readFile ./sources.json);
version = data.version;
mkSource = spec:
assert spec ? type; let
path =
if spec.type == "Git" then mkGitSource spec
else if spec.type == "GitRelease" then mkGitSource spec
else if spec.type == "PyPi" then mkPyPiSource spec
else if spec.type == "Channel" then mkChannelSource spec
else builtins.throw "Unknown source type ${spec.type}";
in
spec // { outPath = path; };
mkGitSource = { repository, revision, url ? null, hash, ... }:
assert repository ? type;
# At the moment, either it is a plain git repository (which has an url), or it is a GitHub/GitLab repository
# In the latter case, there we will always be an url to the tarball
if url != null then
(builtins.fetchTarball {
inherit url;
sha256 = hash; # FIXME: check nix version & use SRI hashes
})
else assert repository.type == "Git"; builtins.fetchGit {
url = repository.url;
rev = revision;
# hash = hash;
};
mkPyPiSource = { url, hash, ... }:
builtins.fetchurl {
inherit url;
sha256 = hash;
};
mkChannelSource = { url, hash, ... }:
builtins.fetchTarball {
inherit url;
sha256 = hash;
};
in
if version == 3 then
builtins.mapAttrs (_: mkSource) data.pins
else
throw "Unsupported format version ${toString version} in sources.json. Try running `npins upgrade`"

11
npins/sources.json Normal file
View file

@ -0,0 +1,11 @@
{
"pins": {
"nixpkgs": {
"type": "Channel",
"name": "nixpkgs-unstable",
"url": "https://releases.nixos.org/nixpkgs/nixpkgs-23.11pre536306.12bdeb01ff9e/nixexprs.tar.xz",
"hash": "00az00b7h8g4h0n6kvrmjykk70mz91dd5v9fy1fbxzszisvgl9d3"
}
},
"version": 3
}

19
pyjecteur.nix Normal file
View file

@ -0,0 +1,19 @@
{ lib , pkgs, stdenv, setuptoolsBuildHook, buildPythonPackage, pythonPackages , fetchFromGitHub }:
let
attrs = {
name = "pyjecteur";
version = "2.0";
doCheck = false;
src = ./. ;
passthru = {
wheel = buildPythonPackage (attrs // {
name = "pyjecteur-py3.whl";
installPhase = "mv dist/pyjecteur-2.0-py3-none-any.whl $out";
dontFixup = true;
doInstallCheck = false;
});
};
propagatedBuildInputs = [ pythonPackages.pyserial pythonPackages.colour ];
};
in
buildPythonPackage attrs

3
pyjecteur/__init__.py Normal file
View file

@ -0,0 +1,3 @@
"""
pyjecteur is a python framework for programming lights in DMX
"""

176
pyjecteur/dummy.py Normal file
View file

@ -0,0 +1,176 @@
import logging
from enum import Enum
from typing import Optional
class MsgTypes(Enum):
"""
Gadget message types
"""
REPROGRAM_FIRMWARE = b"\x01"
FLASH_FIRMWARE_PAGE = b"\x02"
GET_WIDGET_PARAMS = b"\x03"
SET_WIDGET_PARAMS = b"\x04"
RECEIVED_DMX = b"\x05"
SEND_DMX = b"\x06"
RECEIVE_DMX_ON_CHANGE = b"\x08"
RECEIVE_DMX_CHANGE_OF_STATE = b"\x09"
GET_SERIAL_NUMBER = b"\x0a"
def from_bytes(b: bytes) -> int:
"""
Convert bytes to int (endianness compatible with the Gadget)
"""
return int.from_bytes(b, "little")
def to_bytes(b: int, length: int = 1) -> bytes:
"""
Convert int to bytes (endianness compatible with the Gadget)
"""
return b.to_bytes(length, "little")
class Widget:
"""
Class managing the communication with the gadget
"""
def __init__(self, serial_device_path: Optional[str] = None) -> None:
self._dmx = bytes(512)
self.break_time: Optional[int] = None
self.mab_time: Optional[int] = None
self.rate: Optional[int] = None
self.firmware: tuple[Optional[int], Optional[int]] = (None, None)
self._auto_commit = True
self.dmx_staging = bytearray(512)
self.serial_device_path = serial_device_path
@property
def auto_commit(self) -> bool:
"""
Get `auto_commit` value, `True` if auto commit is enabled
"""
return self._auto_commit
@auto_commit.setter
def auto_commit(self, value: bool):
"""
Set `auto_commit` value
"""
self._auto_commit = value
if self._auto_commit:
self.commit()
@property
def is_open(self) -> bool:
return True
def __enter__(self):
return self
def _init_widget(self):
# Retrieve widget parameters
self.send_message(MsgTypes.GET_WIDGET_PARAMS, b"\x00\x00")
msg_type, data, _ = None, None, None
while msg_type != MsgTypes.GET_WIDGET_PARAMS.value:
msg_type, data, _ = self.read_message()
assert msg_type is not None and data is not None
self.break_time = data[2]
self.mab_time = data[3]
self.rate = data[4]
self.firmware = (data[1], data[0])
self.send_message(MsgTypes.SEND_DMX, b"\x00" + self._dmx)
logging.info("[WIDGET] Initialized widget.")
logging.info(f"[WIDGET] Break time: {self.break_time * 10.67}µs")
logging.info(f"[WIDGET] MAB time: {self.mab_time * 10.67}µs")
logging.info(f"[WIDGET] DMX rate: {self.rate}Hz")
logging.info(f"[WIDGET] Firmware v{self.firmware[0]}.{self.firmware[1]}")
def __exit__(self, exc_type, exc_value, traceback):
pass
def set_params(
self,
break_time: Optional[int] = None,
mab_time: Optional[int] = None,
rate: Optional[int] = None,
) -> None:
"""
Set widget params
"""
if break_time is not None:
if break_time >= 128 or break_time < 9:
raise ValueError(
f"Break time must be in [9,128) range, provided {break_time}"
)
self.break_time = break_time
if mab_time is not None:
if mab_time >= 128 or mab_time < 1:
raise ValueError(
f"MAB time must be in [1,128) range, provided {mab_time}"
)
self.mab_time = mab_time
if rate is not None:
if rate >= 41 or rate < 0:
raise ValueError(f"DMX rate must be in [0,41) range, provided {rate}")
self.rate = rate
assert (
self.break_time is not None
and self.mab_time is not None
and self.rate is not None
)
data = (
b"\x00\x00"
+ to_bytes(self.break_time)
+ to_bytes(self.mab_time)
+ to_bytes(self.rate)
)
self.send_message(MsgTypes.SET_WIDGET_PARAMS, data)
def set_dmx(
self,
addr: int,
data: bytes,
addr_starting: int = 0,
) -> None:
"""
Set dmx values that will be sent.
If `auto_commit` is set to false, to provided configuration won't be
sent but memorized until the next call to `Widget.commit()` (or a call
to this function with `auto_commit = True`).
"""
true_addr = addr - addr_starting
if len(data) + true_addr > 512:
raise ValueError("Can't send more than 512 value over dmx")
self.dmx_staging[true_addr : true_addr + len(data)] = data
if self.auto_commit:
self.commit()
def commit(self) -> None:
"""Commit the staging DMX data and send it"""
self._dmx = bytes(self.dmx_staging)
self.send_message(MsgTypes.SEND_DMX, b"\x00" + self._dmx)
def send_message(self, msg_type: MsgTypes, data: bytes) -> None:
"""
Lower level API for sending instruction to widget.
Prefer the use of `Widget.set_dmx` or `Widget.set_params`.
"""
if msg_type not in MsgTypes:
raise KeyError("Message type not known")
datalen = to_bytes(len(data), 2)
message = b"\x7E" + msg_type.value + datalen + data + b"\xE7"
logging.debug(f"[SERIAL] Sending {message}")
# self._serial.flush()
def read_message(self) -> tuple[bytes, bytes, bool]:
"""
Low level API for reading message sent by the widget.
"""
raise NotImplementedError()

109
pyjecteur/fixtures.py Normal file
View file

@ -0,0 +1,109 @@
""""""
from colour import Color
from .lights import AbstractLight
from .reactive import L, RBool, RInt, RList
class Strob(AbstractLight):
address_size = 2
freq = RInt(0, 1)
dim = RInt(0, 0)
class StrobInv(AbstractLight):
address_size = 2
freq = RInt(0, 0)
dim = RInt(0, 1)
class Wash(AbstractLight):
"""
Wash
"""
address_size = 14
pan = RInt(0, 0)
tilt = RInt(0, 1)
speed = RInt(0, 2)
red = RInt(0, 3)
green = RInt(0, 4)
blue = RInt(0, 5)
white = RInt(0, 6)
dimmer = RInt(255, 9)
shutter = RBool(True, 10, true_val=b"\x15")
zoom = RInt(0, 11)
class Tradi(AbstractLight):
"""
Tradi RGB
"""
address_size = 3
red = RInt(0, 0)
green = RInt(0, 1)
blue = RInt(0, 2)
class ParMiskin:
"""
Par 56 led
"""
address_size = 8
red = RInt(0, 0)
green = RInt(0, 1)
blue = RInt(0, 2)
dimmer = RInt(255, 7)
class ParLed(AbstractLight):
"""
Par Led Theatre
"""
address_size = 7
red = RInt(0, 0)
green = RInt(0, 1)
blue = RInt(0, 2)
dimmer = RInt(255, 6)
class Blinder(AbstractLight):
"""
Blinder
"""
address_size = 51
dimmer = RInt(255, 1)
flash = RInt(0, 2)
colors = RList(
[Color(rgb=(0, 0, 0)) for i in range(16)],
3,
3,
from_byte=lambda x: Color(f"#{x.hex()}"),
to_byte=lambda x: bytes.fromhex(x.hex_l[1:]),
)
class LedBar48Ch(AbstractLight):
"""
Led Bar addressed on 48 channels
"""
address_size = 48
colors = RList(
[Color(rgb=(0, 0, 0)) for i in range(16)],
0,
3,
from_byte=lambda x: Color(f"#{x.hex()}"),
to_byte=lambda x: bytes.fromhex(x.hex_l[1:]),
)

135
pyjecteur/lights.py Normal file
View file

@ -0,0 +1,135 @@
"""
Module providing class for handling fixtures and generating the appropriate DMX.
"""
from copy import deepcopy
from typing import Any, Callable, Optional, Union
from .reactive import BaseReactiveValue, ReactiveMixin
from .widget import Widget
class Universe:
"""Represents a DMX universe.
Manages the adress space and responsibles for sending DMX to widget when
`Universe.update_dmx()` is called.
"""
lights = {}
def __init__(self, widget):
"""
Initializes the Universe
widget must be a class providing `widget.set_dmx(data, address)`
"""
self.widget: Widget = widget
def register(self, light: "AbstractLight", address: int) -> None:
"""
Register a light at the specified address
"""
# TODO: add checks for address overlapping
self.lights[light] = address
light.register_universe(self)
light.update_dmx()
def update_dmx(self, light: "AbstractLight", data: Union[bytearray, bytes]) -> None:
"""
Update the dmx data of the specified light
"""
# TODO: add checks for length
self.widget.set_dmx(self.lights[light], data)
class AbstractLight:
"""
Abstract class for lights
"""
address_size: int = 0
def __init__(self):
self._universe: Optional[Universe] = None
# The dmx values
self._dmx: bytes = bytearray(self.address_size)
# dmx memory_view to change in O(1) the values
self._dmx_mv = memoryview(self._dmx)
# Dict holdin conversion functions for attr values to dmx:
# { attr_name => (address, length, converter function) }
self._attrs_to_dmx: dict[str, tuple[int, int, Callable[[Any], bytes]]] = {}
# List holding conversion functions from dmx bytes to attrs.
# [ ( "attr_name", dmx_addr, length, converter function ) ]
self._dmx_to_attrs: list[
Optional[tuple[str, int, int, Callable[[bytes], Any]]]
] = [None for _ in range(self.address_size)]
self._enable_auto_update: bool = False
for key, rValueObject in self.__class__.__dict__.items():
if isinstance(rValueObject, BaseReactiveValue):
# On copie la valeur
val = deepcopy(rValueObject.value)
if isinstance(val, ReactiveMixin):
val.light = self
val.key = key
self._attrs_to_dmx[key] = rValueObject.attr_to_dmx()
for i, length, callback in rValueObject.dmx_to_attr():
for k in range(i, i + length):
self._dmx_to_attrs[k] = (key, i, length, callback)
# Finally set the attributes to their value
setattr(self, key, val)
self._enable_auto_update: bool = True
def register_universe(self, universe: "Universe") -> None:
"""Assign a universe to this light"""
if self._universe is not None:
raise ValueError("Can't assign light to more than one universe")
self._universe = universe
def update_dmx(self) -> None:
"""
Method to be called when the DMX values may have changed.
This method sends DMX velues to the Universe. It is automatically
triggered by property assignments.
"""
if self._universe is not None and self._enable_auto_update:
self._universe.update_dmx(self, self._dmx)
def __setattr__(self, name: str, value: Any) -> None:
"""
Automatically update dmx when a fixture param is set
"""
self.__dict__[name] = value
if not name.startswith("_"):
self.attr_set_hook(name, value)
def attr_set_hook(self, name, value):
"""
Hook to be called when an attribute is set in order to update DMX
values
"""
if name in self._attrs_to_dmx:
# if the attr is linked to dmx, update self._dmx
position, length, converter = self._attrs_to_dmx[name]
self._dmx_mv[position : position + length] = converter(value)
self.update_dmx()
def __getitem__(self, key) -> bytes:
return self._dmx[key]
def __setitem__(self, key: int, value: bytes) -> None:
self._dmx_mv[key : key + 1] = value
if self._dmx_to_attrs[key] is not None:
attr, position, length, converter = self._dmx_to_attrs[
key
] # pyright: ignore
self._enable_auto_update = False
setattr(self, attr, converter(self._dmx[position : position + length]))
self._enable_auto_update = True
self.update_dmx()

174
pyjecteur/reactive.py Normal file
View file

@ -0,0 +1,174 @@
"""
Module holding classes for easy specification of fixtures attributes
"""
from typing import Any, Callable, Iterable, Optional
from .widget import from_bytes, to_bytes
class ReactiveMixin:
"""
Mixin for data types that need to update
"""
light = None
key: Optional[str] = None
class BaseReactiveValue:
"""
Abstract class for defining the parsers between DMX values and object attributes
"""
address: Optional[list[int]] = None
value: Any = None
def attr_to_dmx(self) -> tuple[int, int, Callable[[Any], bytes]]:
"""
Fonction qui retourne les informations pour convertir une valeur
d'attribut en DMX sous la forme:
```
(addresse, longueur, fonction_de_conversion)
```
"""
raise NotImplementedError()
def dmx_to_attr(self) -> Iterable[tuple[int, int, Callable[[bytes], Any]]]:
"""
Fonction qui retourne les informations pour mettre à jour les attributs
si le DMX a changé sous la forme
```
list((addresse, longueur, fonction_de_conversion))
```
"""
raise NotImplementedError()
class RInt(BaseReactiveValue):
"""
Int light attribute
"""
def __init__(self, value: int, address: int, length: int = 1):
self.value: int = value
self.address: int = address
self.length: int = length
def dmx_to_attr(self):
return [(self.address, self.length, lambda _, b: from_bytes(b))]
def attr_to_dmx(self):
return (self.address, self.length, lambda x: to_bytes(x, length=self.length))
class RBool(BaseReactiveValue):
"""
Boolean light attribute
"""
def __init__( # pylint: disable=too-many-arguments
self,
value,
address,
true_val: bytes = b"\xff",
false_val: bytes = b"\x00",
length=1,
):
self.value = value
self.address = address
self.length = length
self.true_val = true_val
self.false_val = false_val
def dmx_to_attr(self):
return [(self.address, self.length, lambda _, x: x == self.true_val)]
def attr_to_dmx(self):
return (
self.address,
self.length,
lambda x: self.true_val if x else self.false_val,
)
class L(ReactiveMixin): # ruff: disable=invalid-name
"""
Thin wrapper around lists to handle reactivity inside lists
"""
def __init__(
self,
val: list[Any],
):
self._val = val
def __len__(self):
return len(self._val)
def __getitem__(self, key):
return self._val[key]
def __setitem__(self, key, value):
self._val[key] = value
if self.light:
self.light.attr_set_hook(self.key, self)
def __iter__(self):
return self._val.__iter__()
class RList(BaseReactiveValue):
"""
List light attribute
"""
def __init__( # pylint: disable=too-many-arguments
self,
value: list[Any],
address: int,
unit_size: int = 1,
from_byte: Callable[[bytes], Any] = from_bytes,
to_byte: Callable[[Any], bytes] = to_bytes,
):
"""
value: L([]) object
address: DMX address
unit_size: size of one value in the list (number of DMX slots it takes)
from_bytes, to_byte: convert function from list value to dmx bytes
(and reciprocally)
"""
self.value = L(value)
self.address: int = address
self.unit_size = unit_size
self.length = len(value) * unit_size
self.from_byte = from_byte
self.to_byte = to_byte
def dmx_to_attr(self):
def parser_factory(i):
"""
Factory to create functions that updates i-th value of list
"""
def parser(value, bytes_val):
value._val[i] = self.from_byte(bytes_val)
return value
return parser
return [
(
self.address + i * self.unit_size,
self.unit_size,
parser_factory(i),
)
for i in range(len(self.value))
]
def attr_to_dmx(self):
return (
self.address,
self.length,
lambda x: b"".join([self.to_byte(i) for i in x]),
)

205
pyjecteur/widget.py Normal file
View file

@ -0,0 +1,205 @@
import logging
from enum import Enum
from typing import Optional
import serial
class MsgTypes(Enum):
"""
Gadget message types
"""
REPROGRAM_FIRMWARE = b"\x01"
FLASH_FIRMWARE_PAGE = b"\x02"
GET_WIDGET_PARAMS = b"\x03"
SET_WIDGET_PARAMS = b"\x04"
RECEIVED_DMX = b"\x05"
SEND_DMX = b"\x06"
RECEIVE_DMX_ON_CHANGE = b"\x08"
RECEIVE_DMX_CHANGE_OF_STATE = b"\x09"
GET_SERIAL_NUMBER = b"\x0a"
def from_bytes(b: bytes) -> int:
"""
Convert bytes to int (endianness compatible with the Gadget)
"""
return int.from_bytes(b, "little")
def to_bytes(b: int, length: int = 1) -> bytes:
"""
Convert int to bytes (endianness compatible with the Gadget)
"""
return b.to_bytes(length, "little")
class Widget:
"""
Class managing the communication with the gadget
"""
def __init__(self, serial_device_path: Optional[str] = None) -> None:
self._dmx = bytes(512)
self.break_time: Optional[int] = None
self.mab_time: Optional[int] = None
self.rate: Optional[int] = None
self.firmware: tuple[Optional[int], Optional[int]] = (None, None)
self._auto_commit = True
self.dmx_staging = bytearray(512)
self.serial_device_path = serial_device_path
self._serial = serial.Serial(self.serial_device_path, 115200)
if self._serial.is_open:
self._init_widget()
@property
def auto_commit(self) -> bool:
"""
Get `auto_commit` value, `True` if auto commit is enabled
"""
return self._auto_commit
@auto_commit.setter
def auto_commit(self, value: bool):
"""
Set `auto_commit` value
"""
self._auto_commit = value
if self._auto_commit:
self.commit()
@property
def is_open(self) -> bool:
return self._serial.is_open
def __enter__(self):
self._serial = serial.Serial(self.serial_device_path, 115200)
self._serial = self._serial.__enter__()
if self._serial.is_open:
self._init_widget()
return self
def _init_widget(self):
# Retrieve widget parameters
self.send_message(MsgTypes.GET_WIDGET_PARAMS, b"\x00\x00")
msg_type, data, _ = None, None, None
while msg_type != MsgTypes.GET_WIDGET_PARAMS.value:
msg_type, data, _ = self.read_message()
assert msg_type is not None and data is not None
self.break_time = data[2]
self.mab_time = data[3]
self.rate = data[4]
self.firmware = (data[1], data[0])
self.send_message(MsgTypes.SEND_DMX, b"\x00" + self._dmx)
logging.info("[WIDGET] Initialized widget.")
logging.info(f"[WIDGET] Break time: {self.break_time * 10.67}µs")
logging.info(f"[WIDGET] MAB time: {self.mab_time * 10.67}µs")
logging.info(f"[WIDGET] DMX rate: {self.rate}Hz")
logging.info(f"[WIDGET] Firmware v{self.firmware[0]}.{self.firmware[1]}")
def __exit__(self, exc_type, exc_value, traceback):
self._serial.__exit__(exc_type, exc_value, traceback)
def set_params(
self,
break_time: Optional[int] = None,
mab_time: Optional[int] = None,
rate: Optional[int] = None,
) -> None:
"""
Set widget params
"""
if break_time is not None:
if break_time >= 128 or break_time < 9:
raise ValueError(
f"Break time must be in [9,128) range, provided {break_time}"
)
self.break_time = break_time
if mab_time is not None:
if mab_time >= 128 or mab_time < 1:
raise ValueError(
f"MAB time must be in [1,128) range, provided {mab_time}"
)
self.mab_time = mab_time
if rate is not None:
if rate >= 41 or rate < 0:
raise ValueError(f"DMX rate must be in [0,41) range, provided {rate}")
self.rate = rate
assert (
self.break_time is not None
and self.mab_time is not None
and self.rate is not None
)
data = (
b"\x00\x00"
+ to_bytes(self.break_time)
+ to_bytes(self.mab_time)
+ to_bytes(self.rate)
)
self.send_message(MsgTypes.SET_WIDGET_PARAMS, data)
def set_dmx(
self,
addr: int,
data: bytes,
addr_starting: int = 0,
) -> None:
"""
Set dmx values that will be sent.
If `auto_commit` is set to false, to provided configuration won't be
sent but memorized until the next call to `Widget.commit()` (or a call
to this function with `auto_commit = True`).
"""
true_addr = addr - addr_starting
if len(data) + true_addr > 512:
raise ValueError("Can't send more than 512 value over dmx")
self.dmx_staging[true_addr : true_addr + len(data)] = data
if self.auto_commit:
self.commit()
def commit(self) -> None:
"""Commit the staging DMX data and send it"""
self._dmx = bytes(self.dmx_staging)
self.send_message(MsgTypes.SEND_DMX, b"\x00" + self._dmx)
def send_message(self, msg_type: MsgTypes, data: bytes) -> None:
"""
Lower level API for sending instruction to widget.
Prefer the use of `Widget.set_dmx` or `Widget.set_params`.
"""
if msg_type not in MsgTypes:
raise KeyError("Message type not known")
datalen = to_bytes(len(data), 2)
if not self._serial.is_open:
raise ValueError("Serial port is closed")
message = b"\x7E" + msg_type.value + datalen + data + b"\xE7"
logging.debug(f"[SERIAL] Sending {message}")
self._serial.write(message)
# self._serial.flush()
def read_message(self) -> tuple[bytes, bytes, bool]:
"""
Low level API for reading message sent by the widget.
"""
current_byte = b"\x00"
# Flush until start byte
while current_byte != b"\x7E":
current_byte = self._serial.read(1)
logging.debug(f"[SERIAL] Read {current_byte}")
msg_type = self._serial.read(1)
logging.debug(f"[SERIAL] Read {msg_type}")
data_len_raw = self._serial.read(2)
logging.debug(f"[SERIAL] Read {data_len_raw}")
data_len = from_bytes(data_len_raw)
data = self._serial.read(data_len)
logging.debug(f"[SERIAL] Read {data}")
end_byte = self._serial.read(1)
logging.debug(f"[SERIAL] Read {end_byte}")
is_correct = end_byte == b"\xE7"
if not is_correct:
logging.warning("[SERIAL] Invalid message received")
return msg_type, data, is_correct

11
setup.py Normal file
View file

@ -0,0 +1,11 @@
from setuptools import setup
setup(
name="pyjecteur",
version="2.0",
packages=["pyjecteur"],
install_requires=[
"pyserial",
"colour",
],
)

40
shell.nix Normal file
View file

@ -0,0 +1,40 @@
{
nixpkgs ? (import ./npins).nixpkgs
, pkgs ? import nixpkgs {}
}:
let
python = pkgs.python310.override {
packageOverrides = self: super: {
pyjecteur = self.callPackage ./pyjecteur.nix {};
};
};
in
{
dev = pkgs.mkShell {
packages = [
(pkgs.python310.withPackages (ps: [
# build dep
ps.pyserial
ps.colour
# dev dep
ps.black
ps.pylint
ps.ipython
]))
pkgs.pyright
];
};
prod = pkgs.mkShell {
packages = [
(python.withPackages (ps: [
ps.pyjecteur
ps.ipython
ps.black
ps.pylint
]))
pkgs.pyright
];
};
}