init
This commit is contained in:
commit
2875997232
16 changed files with 1081 additions and 0 deletions
3
.gitignore
vendored
Normal file
3
.gitignore
vendored
Normal file
|
@ -0,0 +1,3 @@
|
|||
*.swp
|
||||
result
|
||||
__pycache__
|
BIN
API_DOC.pdf
Normal file
BIN
API_DOC.pdf
Normal file
Binary file not shown.
77
README.md
Normal file
77
README.md
Normal file
|
@ -0,0 +1,77 @@
|
|||
# `pyjecteur`
|
||||
|
||||
## Obtenir un nix-shell avec `pyjecteur` (équivalent d'un virtualenv mais avec nix)
|
||||
|
||||
Installer nix (`curl --proto '=https' --tlsv1.2 -sSf -L https://install.determinate.systems/nix | sh -s -- install`) puis executer
|
||||
|
||||
```
|
||||
nix-shell -A prod
|
||||
```
|
||||
|
||||
## Documentation rapide
|
||||
|
||||
### `pyjecteur.widget.Widget`
|
||||
|
||||
Contient la logique d'interaction avec le module DMX-USB. Pour l'utiliser, il
|
||||
suffit d'instancier l'objet puis d'appler la méthode `set_dmx`:
|
||||
```python
|
||||
def set_dmx(self, addr: int, data: bytes, addr_starting: int = 0) -> None:
|
||||
"""
|
||||
Set dmx values that will be sent.
|
||||
|
||||
If `auto_commit` is set to false, to provided configuration won't be
|
||||
sent but memorized until the next call to `Widget.commit()` (or a call
|
||||
to this function with `auto_commit = True`).
|
||||
"""
|
||||
```
|
||||
|
||||
Il existe aussi `pyjecteur.dummy.Widget` qui ne requiert pas de device physique.
|
||||
|
||||
Pour des interactions à plus haut niveau, continuez de lire
|
||||
|
||||
### `pyjecteur.lights.Universe`
|
||||
|
||||
Objet représentant un Univers.
|
||||
|
||||
```python
|
||||
class Universe:
|
||||
"""Represents a DMX universe.
|
||||
|
||||
Manages the adress space and responsibles for sending DMX to widget when
|
||||
`Universe.update_dmx()` is called.
|
||||
"""
|
||||
|
||||
def __init__(self, widget):
|
||||
"""
|
||||
Initializes the Universe
|
||||
|
||||
widget must be a class providing `widget.set_dmx(data, address)`
|
||||
"""
|
||||
|
||||
def register(self, light: "AbstractLight", address: int) -> None:
|
||||
"""
|
||||
Register a light at the specified address
|
||||
"""
|
||||
|
||||
def update_dmx(self, light: "AbstractLight", data: Union[bytearray, bytes]) -> None:
|
||||
"""
|
||||
Update the dmx data of the specified light
|
||||
"""
|
||||
```
|
||||
|
||||
### Interéagir avec un projecteur
|
||||
|
||||
Globalement, il faut instancier l'objet représentant le projecteur puis
|
||||
l'enregister sur son Univers (`Universe.register(light, address)`, ATTENTION
|
||||
les addresses commencent à 0).
|
||||
|
||||
Ensuite il y a deux façon de changer les paramètres:
|
||||
|
||||
1. Changer les attributs (`projo.pan = 100`)
|
||||
2. Changer les addresses directement avec la notation index (change la valeur
|
||||
à l'addresse `addresse_projo + index`)
|
||||
|
||||
|
||||
## Déclaration d'un nouveau type de projecteur
|
||||
|
||||
S'inspirer du fichier `pyjecteur/fixtures.py`
|
2
default.nix
Normal file
2
default.nix
Normal file
|
@ -0,0 +1,2 @@
|
|||
{ pkgs ? import (import ./npins).nixpkgs {} }:
|
||||
pkgs.python310Packages.callPackage ./pyjecteur.nix {}
|
47
npins/default.nix
Normal file
47
npins/default.nix
Normal file
|
@ -0,0 +1,47 @@
|
|||
# Generated by npins. Do not modify; will be overwritten regularly
|
||||
let
|
||||
data = builtins.fromJSON (builtins.readFile ./sources.json);
|
||||
version = data.version;
|
||||
|
||||
mkSource = spec:
|
||||
assert spec ? type; let
|
||||
path =
|
||||
if spec.type == "Git" then mkGitSource spec
|
||||
else if spec.type == "GitRelease" then mkGitSource spec
|
||||
else if spec.type == "PyPi" then mkPyPiSource spec
|
||||
else if spec.type == "Channel" then mkChannelSource spec
|
||||
else builtins.throw "Unknown source type ${spec.type}";
|
||||
in
|
||||
spec // { outPath = path; };
|
||||
|
||||
mkGitSource = { repository, revision, url ? null, hash, ... }:
|
||||
assert repository ? type;
|
||||
# At the moment, either it is a plain git repository (which has an url), or it is a GitHub/GitLab repository
|
||||
# In the latter case, there we will always be an url to the tarball
|
||||
if url != null then
|
||||
(builtins.fetchTarball {
|
||||
inherit url;
|
||||
sha256 = hash; # FIXME: check nix version & use SRI hashes
|
||||
})
|
||||
else assert repository.type == "Git"; builtins.fetchGit {
|
||||
url = repository.url;
|
||||
rev = revision;
|
||||
# hash = hash;
|
||||
};
|
||||
|
||||
mkPyPiSource = { url, hash, ... }:
|
||||
builtins.fetchurl {
|
||||
inherit url;
|
||||
sha256 = hash;
|
||||
};
|
||||
|
||||
mkChannelSource = { url, hash, ... }:
|
||||
builtins.fetchTarball {
|
||||
inherit url;
|
||||
sha256 = hash;
|
||||
};
|
||||
in
|
||||
if version == 3 then
|
||||
builtins.mapAttrs (_: mkSource) data.pins
|
||||
else
|
||||
throw "Unsupported format version ${toString version} in sources.json. Try running `npins upgrade`"
|
11
npins/sources.json
Normal file
11
npins/sources.json
Normal file
|
@ -0,0 +1,11 @@
|
|||
{
|
||||
"pins": {
|
||||
"nixpkgs": {
|
||||
"type": "Channel",
|
||||
"name": "nixpkgs-unstable",
|
||||
"url": "https://releases.nixos.org/nixpkgs/nixpkgs-24.11pre690413.1366d1af8f58/nixexprs.tar.xz",
|
||||
"hash": "0nlrrs22pf3dr7m55glxxfdg2qn6q9sjsd5dc7bvigasb1v3gw4j"
|
||||
}
|
||||
},
|
||||
"version": 3
|
||||
}
|
19
pyjecteur.nix
Normal file
19
pyjecteur.nix
Normal file
|
@ -0,0 +1,19 @@
|
|||
{ lib , pkgs, stdenv, setuptoolsBuildHook, buildPythonPackage, pythonPackages , fetchFromGitHub }:
|
||||
let
|
||||
attrs = {
|
||||
name = "pyjecteur";
|
||||
version = "2.0";
|
||||
doCheck = false;
|
||||
src = ./. ;
|
||||
passthru = {
|
||||
wheel = buildPythonPackage (attrs // {
|
||||
name = "pyjecteur-py3.whl";
|
||||
installPhase = "mv dist/pyjecteur-2.0-py3-none-any.whl $out";
|
||||
dontFixup = true;
|
||||
doInstallCheck = false;
|
||||
});
|
||||
};
|
||||
propagatedBuildInputs = [ pythonPackages.pyserial pythonPackages.colour ];
|
||||
};
|
||||
in
|
||||
buildPythonPackage attrs
|
3
pyjecteur/__init__.py
Normal file
3
pyjecteur/__init__.py
Normal file
|
@ -0,0 +1,3 @@
|
|||
"""
|
||||
pyjecteur is a python framework for programming lights in DMX
|
||||
"""
|
176
pyjecteur/dummy.py
Normal file
176
pyjecteur/dummy.py
Normal file
|
@ -0,0 +1,176 @@
|
|||
import logging
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
|
||||
class MsgTypes(Enum):
|
||||
"""
|
||||
Gadget message types
|
||||
"""
|
||||
|
||||
REPROGRAM_FIRMWARE = b"\x01"
|
||||
FLASH_FIRMWARE_PAGE = b"\x02"
|
||||
GET_WIDGET_PARAMS = b"\x03"
|
||||
SET_WIDGET_PARAMS = b"\x04"
|
||||
RECEIVED_DMX = b"\x05"
|
||||
SEND_DMX = b"\x06"
|
||||
RECEIVE_DMX_ON_CHANGE = b"\x08"
|
||||
RECEIVE_DMX_CHANGE_OF_STATE = b"\x09"
|
||||
GET_SERIAL_NUMBER = b"\x0a"
|
||||
|
||||
|
||||
def from_bytes(b: bytes) -> int:
|
||||
"""
|
||||
Convert bytes to int (endianness compatible with the Gadget)
|
||||
"""
|
||||
return int.from_bytes(b, "little")
|
||||
|
||||
|
||||
def to_bytes(b: int, length: int = 1) -> bytes:
|
||||
"""
|
||||
Convert int to bytes (endianness compatible with the Gadget)
|
||||
"""
|
||||
return b.to_bytes(length, "little")
|
||||
|
||||
|
||||
class Widget:
|
||||
"""
|
||||
Class managing the communication with the gadget
|
||||
"""
|
||||
|
||||
def __init__(self, serial_device_path: Optional[str] = None) -> None:
|
||||
self._dmx = bytes(512)
|
||||
self.break_time: Optional[int] = None
|
||||
self.mab_time: Optional[int] = None
|
||||
self.rate: Optional[int] = None
|
||||
self.firmware: tuple[Optional[int], Optional[int]] = (None, None)
|
||||
self._auto_commit = True
|
||||
|
||||
self.dmx_staging = bytearray(512)
|
||||
self.serial_device_path = serial_device_path
|
||||
|
||||
@property
|
||||
def auto_commit(self) -> bool:
|
||||
"""
|
||||
Get `auto_commit` value, `True` if auto commit is enabled
|
||||
"""
|
||||
return self._auto_commit
|
||||
|
||||
@auto_commit.setter
|
||||
def auto_commit(self, value: bool):
|
||||
"""
|
||||
Set `auto_commit` value
|
||||
"""
|
||||
self._auto_commit = value
|
||||
if self._auto_commit:
|
||||
self.commit()
|
||||
|
||||
@property
|
||||
def is_open(self) -> bool:
|
||||
return True
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def _init_widget(self):
|
||||
# Retrieve widget parameters
|
||||
self.send_message(MsgTypes.GET_WIDGET_PARAMS, b"\x00\x00")
|
||||
msg_type, data, _ = None, None, None
|
||||
while msg_type != MsgTypes.GET_WIDGET_PARAMS.value:
|
||||
msg_type, data, _ = self.read_message()
|
||||
assert msg_type is not None and data is not None
|
||||
|
||||
self.break_time = data[2]
|
||||
self.mab_time = data[3]
|
||||
self.rate = data[4]
|
||||
self.firmware = (data[1], data[0])
|
||||
self.send_message(MsgTypes.SEND_DMX, b"\x00" + self._dmx)
|
||||
logging.info("[WIDGET] Initialized widget.")
|
||||
logging.info(f"[WIDGET] Break time: {self.break_time * 10.67}µs")
|
||||
logging.info(f"[WIDGET] MAB time: {self.mab_time * 10.67}µs")
|
||||
logging.info(f"[WIDGET] DMX rate: {self.rate}Hz")
|
||||
logging.info(f"[WIDGET] Firmware v{self.firmware[0]}.{self.firmware[1]}")
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
pass
|
||||
|
||||
def set_params(
|
||||
self,
|
||||
break_time: Optional[int] = None,
|
||||
mab_time: Optional[int] = None,
|
||||
rate: Optional[int] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Set widget params
|
||||
"""
|
||||
if break_time is not None:
|
||||
if break_time >= 128 or break_time < 9:
|
||||
raise ValueError(
|
||||
f"Break time must be in [9,128) range, provided {break_time}"
|
||||
)
|
||||
self.break_time = break_time
|
||||
if mab_time is not None:
|
||||
if mab_time >= 128 or mab_time < 1:
|
||||
raise ValueError(
|
||||
f"MAB time must be in [1,128) range, provided {mab_time}"
|
||||
)
|
||||
self.mab_time = mab_time
|
||||
if rate is not None:
|
||||
if rate >= 41 or rate < 0:
|
||||
raise ValueError(f"DMX rate must be in [0,41) range, provided {rate}")
|
||||
self.rate = rate
|
||||
assert (
|
||||
self.break_time is not None
|
||||
and self.mab_time is not None
|
||||
and self.rate is not None
|
||||
)
|
||||
data = (
|
||||
b"\x00\x00"
|
||||
+ to_bytes(self.break_time)
|
||||
+ to_bytes(self.mab_time)
|
||||
+ to_bytes(self.rate)
|
||||
)
|
||||
self.send_message(MsgTypes.SET_WIDGET_PARAMS, data)
|
||||
|
||||
def set_dmx(
|
||||
self,
|
||||
addr: int,
|
||||
data: bytes,
|
||||
addr_starting: int = 0,
|
||||
) -> None:
|
||||
"""
|
||||
Set dmx values that will be sent.
|
||||
|
||||
If `auto_commit` is set to false, to provided configuration won't be
|
||||
sent but memorized until the next call to `Widget.commit()` (or a call
|
||||
to this function with `auto_commit = True`).
|
||||
"""
|
||||
true_addr = addr - addr_starting
|
||||
if len(data) + true_addr > 512:
|
||||
raise ValueError("Can't send more than 512 value over dmx")
|
||||
self.dmx_staging[true_addr : true_addr + len(data)] = data
|
||||
if self.auto_commit:
|
||||
self.commit()
|
||||
|
||||
def commit(self) -> None:
|
||||
"""Commit the staging DMX data and send it"""
|
||||
self._dmx = bytes(self.dmx_staging)
|
||||
self.send_message(MsgTypes.SEND_DMX, b"\x00" + self._dmx)
|
||||
|
||||
def send_message(self, msg_type: MsgTypes, data: bytes) -> None:
|
||||
"""
|
||||
Lower level API for sending instruction to widget.
|
||||
Prefer the use of `Widget.set_dmx` or `Widget.set_params`.
|
||||
"""
|
||||
if msg_type not in MsgTypes:
|
||||
raise KeyError("Message type not known")
|
||||
datalen = to_bytes(len(data), 2)
|
||||
message = b"\x7E" + msg_type.value + datalen + data + b"\xE7"
|
||||
logging.debug(f"[SERIAL] Sending {message}")
|
||||
# self._serial.flush()
|
||||
|
||||
def read_message(self) -> tuple[bytes, bytes, bool]:
|
||||
"""
|
||||
Low level API for reading message sent by the widget.
|
||||
"""
|
||||
raise NotImplementedError()
|
107
pyjecteur/fixtures.py
Normal file
107
pyjecteur/fixtures.py
Normal file
|
@ -0,0 +1,107 @@
|
|||
""""""
|
||||
from colour import Color
|
||||
|
||||
from .lights import AbstractLight
|
||||
from .reactive import RBool, RColor, RInt, RList
|
||||
|
||||
|
||||
class Strob(AbstractLight):
|
||||
address_size = 2
|
||||
freq = RInt(0, 1)
|
||||
dim = RInt(0, 0)
|
||||
|
||||
|
||||
class UVBar(AbstractLight):
|
||||
address_size = 3
|
||||
strob = RInt(0, 1)
|
||||
dim = RInt(0, 0)
|
||||
|
||||
|
||||
class StrobInv(AbstractLight):
|
||||
address_size = 2
|
||||
freq = RInt(0, 0)
|
||||
dim = RInt(0, 1)
|
||||
|
||||
|
||||
class Wash(AbstractLight):
|
||||
"""
|
||||
Wash
|
||||
"""
|
||||
|
||||
address_size = 14
|
||||
|
||||
pan = RInt(0, 0)
|
||||
tilt = RInt(0, 1)
|
||||
speed = RInt(0, 2)
|
||||
color = RColor(Color("black"), 3)
|
||||
white = RInt(0, 6)
|
||||
dimmer = RInt(255, 9)
|
||||
shutter = RBool(True, 10, true_val=b"\x15")
|
||||
zoom = RInt(0, 11)
|
||||
|
||||
|
||||
class Tradi(AbstractLight):
|
||||
"""
|
||||
Tradi RGB
|
||||
"""
|
||||
|
||||
address_size = 3
|
||||
|
||||
color = RColor(Color("black"), 0)
|
||||
|
||||
|
||||
class ParMKII(AbstractLight):
|
||||
"""
|
||||
Par 56 led
|
||||
"""
|
||||
|
||||
address_size = 8
|
||||
|
||||
color = RColor(Color("black"), 0)
|
||||
amber = RInt(0, 3)
|
||||
dimmer = RInt(255, 7)
|
||||
|
||||
|
||||
class ParLed(AbstractLight):
|
||||
"""
|
||||
Par Led Theatre
|
||||
"""
|
||||
|
||||
address_size = 7
|
||||
color = RColor(Color("black"), 0)
|
||||
|
||||
dimmer = RInt(255, 6)
|
||||
|
||||
|
||||
class Blinder(AbstractLight):
|
||||
"""
|
||||
Blinder
|
||||
"""
|
||||
|
||||
address_size = 51
|
||||
|
||||
dimmer = RInt(255, 1)
|
||||
flash = RInt(0, 2)
|
||||
colors = RList(
|
||||
[Color(rgb=(0, 0, 0)) for i in range(16)],
|
||||
3,
|
||||
3,
|
||||
from_byte=RColor.from_bytes,
|
||||
to_byte=RColor.to_bytes,
|
||||
)
|
||||
|
||||
|
||||
class LedBar48Ch(AbstractLight):
|
||||
"""
|
||||
Led Bar addressed on 48 channels
|
||||
"""
|
||||
|
||||
address_size = 48
|
||||
|
||||
colors = RList(
|
||||
[Color(rgb=(0, 0, 0)) for i in range(16)],
|
||||
0,
|
||||
3,
|
||||
from_byte=lambda x: Color(f"#{x.hex()}"),
|
||||
to_byte=lambda x: bytes.fromhex(x.hex_l[1:]),
|
||||
)
|
140
pyjecteur/lights.py
Normal file
140
pyjecteur/lights.py
Normal file
|
@ -0,0 +1,140 @@
|
|||
"""
|
||||
Module providing class for handling fixtures and generating the appropriate DMX.
|
||||
"""
|
||||
from copy import deepcopy
|
||||
from typing import Any, Callable, Optional, Union
|
||||
|
||||
from .reactive import BaseReactiveValue, ReactiveMixin
|
||||
from .widget import Widget
|
||||
|
||||
|
||||
class Universe:
|
||||
"""Represents a DMX universe.
|
||||
|
||||
Manages the adress space and responsibles for sending DMX to widget when
|
||||
`Universe.update_dmx()` is called.
|
||||
"""
|
||||
|
||||
lights = {}
|
||||
|
||||
def __init__(self, widget):
|
||||
"""
|
||||
Initializes the Universe
|
||||
|
||||
widget must be a class providing `widget.set_dmx(data, address)`
|
||||
"""
|
||||
self.widget: Widget = widget
|
||||
|
||||
def register(self, light: "AbstractLight", address: int) -> None:
|
||||
"""
|
||||
Register a light at the specified address
|
||||
"""
|
||||
# TODO: add checks for address overlapping
|
||||
self.lights[light] = address
|
||||
light.register_universe(self)
|
||||
light.update_dmx()
|
||||
|
||||
def update_dmx(self, light: "AbstractLight", data: Union[bytearray, bytes]) -> None:
|
||||
"""
|
||||
Update the dmx data of the specified light
|
||||
"""
|
||||
# TODO: add checks for length
|
||||
self.widget.set_dmx(self.lights[light], data)
|
||||
|
||||
|
||||
class AbstractLight:
|
||||
"""
|
||||
Abstract class for lights
|
||||
"""
|
||||
|
||||
address_size: int = 0
|
||||
|
||||
def __init__(self):
|
||||
self._universe: Optional[Universe] = None
|
||||
# The dmx values
|
||||
self._dmx: bytes = bytearray(self.address_size)
|
||||
# dmx memory_view to change in O(1) the values
|
||||
self._dmx_mv = memoryview(self._dmx)
|
||||
|
||||
# Dict holdin conversion functions for attr values to dmx:
|
||||
# { attr_name => (address, length, converter function) }
|
||||
self._attrs_to_dmx: dict[str, tuple[int, int, Callable[[Any], bytes]]] = {}
|
||||
|
||||
# List holding conversion functions from dmx bytes to attrs.
|
||||
# [ ( "attr_name", dmx_addr, length, converter function ) ]
|
||||
self._dmx_to_attrs: list[
|
||||
Optional[tuple[str, int, int, Callable[[bytes], Any]]]
|
||||
] = [None for _ in range(self.address_size)]
|
||||
|
||||
self._enable_auto_update: bool = False
|
||||
|
||||
for key, rValueObject in self.__class__.__dict__.items():
|
||||
if isinstance(rValueObject, BaseReactiveValue):
|
||||
# On copie la valeur
|
||||
val = deepcopy(rValueObject.value)
|
||||
if isinstance(val, ReactiveMixin):
|
||||
val.light = self
|
||||
val.key = key
|
||||
self._attrs_to_dmx[key] = rValueObject.attr_to_dmx()
|
||||
|
||||
for i, length, callback in rValueObject.dmx_to_attr():
|
||||
for k in range(i, i + length):
|
||||
self._dmx_to_attrs[k] = (key, i, length, callback)
|
||||
# Finally set the attributes to their value
|
||||
setattr(self, key, val)
|
||||
self._enable_auto_update: bool = True
|
||||
|
||||
def register_universe(self, universe: "Universe") -> None:
|
||||
"""Assign a universe to this light"""
|
||||
if self._universe is not None:
|
||||
raise ValueError("Can't assign light to more than one universe")
|
||||
self._universe = universe
|
||||
|
||||
def update_dmx(self) -> None:
|
||||
"""
|
||||
Method to be called when the DMX values may have changed.
|
||||
|
||||
This method sends DMX velues to the Universe. It is automatically
|
||||
triggered by property assignments.
|
||||
"""
|
||||
if self._universe is not None and self._enable_auto_update:
|
||||
self._universe.update_dmx(self, self._dmx)
|
||||
|
||||
def __setattr__(self, name: str, value: Any) -> None:
|
||||
"""
|
||||
Automatically update dmx when a fixture param is set
|
||||
"""
|
||||
if name in self.__dict__ and isinstance(self.__dict__[name], ReactiveMixin):
|
||||
self.__dict__[name].set_all(value)
|
||||
self.attr_set_hook(name, value)
|
||||
return
|
||||
|
||||
self.__dict__[name] = value
|
||||
if not name.startswith("_"):
|
||||
self.attr_set_hook(name, value)
|
||||
|
||||
def attr_set_hook(self, name, value):
|
||||
"""
|
||||
Hook to be called when an attribute is set in order to update DMX
|
||||
values
|
||||
"""
|
||||
if name in self._attrs_to_dmx:
|
||||
# if the attr is linked to dmx, update self._dmx
|
||||
position, length, converter = self._attrs_to_dmx[name]
|
||||
self._dmx_mv[position : position + length] = converter(value)
|
||||
self.update_dmx()
|
||||
|
||||
def __getitem__(self, key) -> bytes:
|
||||
return self._dmx[key]
|
||||
|
||||
def __setitem__(self, key: int, value: bytes) -> None:
|
||||
self._dmx_mv[key : key + 1] = value
|
||||
|
||||
if self._dmx_to_attrs[key] is not None:
|
||||
attr, position, length, converter = self._dmx_to_attrs[
|
||||
key
|
||||
] # pyright: ignore
|
||||
self._enable_auto_update = False
|
||||
setattr(self, attr, converter(self._dmx[position : position + length]))
|
||||
self._enable_auto_update = True
|
||||
self.update_dmx()
|
217
pyjecteur/reactive.py
Normal file
217
pyjecteur/reactive.py
Normal file
|
@ -0,0 +1,217 @@
|
|||
"""
|
||||
Module holding classes for easy specification of fixtures attributes
|
||||
"""
|
||||
|
||||
from typing import Any, Callable, Iterable, Optional
|
||||
|
||||
from colour import Color
|
||||
|
||||
from .widget import from_bytes, to_bytes
|
||||
|
||||
|
||||
class ReactiveMixin:
|
||||
"""
|
||||
Mixin for data types that need to update
|
||||
"""
|
||||
|
||||
light = None
|
||||
key: Optional[str] = None
|
||||
|
||||
def set_all(self, new_val):
|
||||
raise NotImplementedError("Can't assign value here")
|
||||
|
||||
|
||||
class BaseReactiveValue:
|
||||
"""
|
||||
Abstract class for defining the parsers between DMX values and object attributes
|
||||
"""
|
||||
|
||||
address: Optional[list[int]] = None
|
||||
value: Any = None
|
||||
|
||||
def attr_to_dmx(self) -> tuple[int, int, Callable[[Any], bytes]]:
|
||||
"""
|
||||
Fonction qui retourne les informations pour convertir une valeur
|
||||
d'attribut en DMX sous la forme:
|
||||
```
|
||||
(addresse, longueur, fonction_de_conversion)
|
||||
```
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def dmx_to_attr(self) -> Iterable[tuple[int, int, Callable[[bytes], Any]]]:
|
||||
"""
|
||||
Fonction qui retourne les informations pour mettre à jour les attributs
|
||||
si le DMX a changé sous la forme
|
||||
```
|
||||
list((addresse, longueur, fonction_de_conversion))
|
||||
```
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class RInt(BaseReactiveValue):
|
||||
"""
|
||||
Int light attribute
|
||||
"""
|
||||
|
||||
def __init__(self, value: int, address: int, length: int = 1):
|
||||
self.value: int = value
|
||||
self.address: int = address
|
||||
self.length: int = length
|
||||
|
||||
def dmx_to_attr(self):
|
||||
return [(self.address, self.length, lambda _, b: from_bytes(b))]
|
||||
|
||||
def attr_to_dmx(self):
|
||||
return (self.address, self.length, lambda x: to_bytes(x, length=self.length))
|
||||
|
||||
|
||||
class RBool(BaseReactiveValue):
|
||||
"""
|
||||
Boolean light attribute
|
||||
"""
|
||||
|
||||
def __init__( # pylint: disable=too-many-arguments
|
||||
self,
|
||||
value,
|
||||
address,
|
||||
true_val: bytes = b"\xff",
|
||||
false_val: bytes = b"\x00",
|
||||
length=1,
|
||||
):
|
||||
self.value = value
|
||||
self.address = address
|
||||
self.length = length
|
||||
self.true_val = true_val
|
||||
self.false_val = false_val
|
||||
|
||||
def dmx_to_attr(self):
|
||||
return [(self.address, self.length, lambda _, x: x == self.true_val)]
|
||||
|
||||
def attr_to_dmx(self):
|
||||
return (
|
||||
self.address,
|
||||
self.length,
|
||||
lambda x: self.true_val if x else self.false_val,
|
||||
)
|
||||
|
||||
|
||||
class RColor(BaseReactiveValue):
|
||||
"""
|
||||
Boolean light attribute
|
||||
"""
|
||||
|
||||
def __init__( # pylint: disable=too-many-arguments
|
||||
self,
|
||||
value,
|
||||
address,
|
||||
):
|
||||
self.value = value
|
||||
self.address = address
|
||||
self.length = 3
|
||||
|
||||
@staticmethod
|
||||
def from_bytes(x):
|
||||
return Color(f"#{x.hex()}")
|
||||
|
||||
@staticmethod
|
||||
def to_bytes(x):
|
||||
return bytes.fromhex(x.hex_l[1:])
|
||||
|
||||
def dmx_to_attr(self):
|
||||
return [(self.address, self.length, lambda _, x: self.from_bytes(x))]
|
||||
|
||||
def attr_to_dmx(self):
|
||||
return (
|
||||
self.address,
|
||||
self.length,
|
||||
self.to_bytes,
|
||||
)
|
||||
|
||||
|
||||
class L(ReactiveMixin): # ruff: disable=invalid-name
|
||||
"""
|
||||
Thin wrapper around lists to handle reactivity inside lists
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
val: list[Any],
|
||||
):
|
||||
self._val = val
|
||||
|
||||
def __len__(self):
|
||||
return len(self._val)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self._val[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self._val[key] = value
|
||||
if self.light:
|
||||
self.light.attr_set_hook(self.key, self)
|
||||
|
||||
def set_all(self, new_value):
|
||||
self._val = new_value
|
||||
if self.light:
|
||||
self.light.attr_set_hook(self.key, self)
|
||||
|
||||
def __iter__(self):
|
||||
return self._val.__iter__()
|
||||
|
||||
|
||||
class RList(BaseReactiveValue):
|
||||
"""
|
||||
List light attribute
|
||||
"""
|
||||
|
||||
def __init__( # pylint: disable=too-many-arguments
|
||||
self,
|
||||
value: list[Any],
|
||||
address: int,
|
||||
unit_size: int = 1,
|
||||
from_byte: Callable[[bytes], Any] = from_bytes,
|
||||
to_byte: Callable[[Any], bytes] = to_bytes,
|
||||
):
|
||||
"""
|
||||
value: L([]) object
|
||||
address: DMX address
|
||||
unit_size: size of one value in the list (number of DMX slots it takes)
|
||||
from_bytes, to_byte: convert function from list value to dmx bytes
|
||||
(and reciprocally)
|
||||
"""
|
||||
self.value = L(value)
|
||||
self.address: int = address
|
||||
self.unit_size = unit_size
|
||||
self.length = len(value) * unit_size
|
||||
self.from_byte = from_byte
|
||||
self.to_byte = to_byte
|
||||
|
||||
def dmx_to_attr(self):
|
||||
def parser_factory(i):
|
||||
"""
|
||||
Factory to create functions that updates i-th value of list
|
||||
"""
|
||||
|
||||
def parser(value, bytes_val):
|
||||
value._val[i] = self.from_byte(bytes_val)
|
||||
return value
|
||||
|
||||
return parser
|
||||
|
||||
return [
|
||||
(
|
||||
self.address + i * self.unit_size,
|
||||
self.unit_size,
|
||||
parser_factory(i),
|
||||
)
|
||||
for i in range(len(self.value))
|
||||
]
|
||||
|
||||
def attr_to_dmx(self):
|
||||
return (
|
||||
self.address,
|
||||
self.length,
|
||||
lambda x: b"".join([self.to_byte(i) for i in x]),
|
||||
)
|
205
pyjecteur/widget.py
Normal file
205
pyjecteur/widget.py
Normal file
|
@ -0,0 +1,205 @@
|
|||
import logging
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
import serial
|
||||
|
||||
|
||||
class MsgTypes(Enum):
|
||||
"""
|
||||
Gadget message types
|
||||
"""
|
||||
|
||||
REPROGRAM_FIRMWARE = b"\x01"
|
||||
FLASH_FIRMWARE_PAGE = b"\x02"
|
||||
GET_WIDGET_PARAMS = b"\x03"
|
||||
SET_WIDGET_PARAMS = b"\x04"
|
||||
RECEIVED_DMX = b"\x05"
|
||||
SEND_DMX = b"\x06"
|
||||
RECEIVE_DMX_ON_CHANGE = b"\x08"
|
||||
RECEIVE_DMX_CHANGE_OF_STATE = b"\x09"
|
||||
GET_SERIAL_NUMBER = b"\x0a"
|
||||
|
||||
|
||||
def from_bytes(b: bytes) -> int:
|
||||
"""
|
||||
Convert bytes to int (endianness compatible with the Gadget)
|
||||
"""
|
||||
return int.from_bytes(b, "little")
|
||||
|
||||
|
||||
def to_bytes(b: int, length: int = 1) -> bytes:
|
||||
"""
|
||||
Convert int to bytes (endianness compatible with the Gadget)
|
||||
"""
|
||||
return b.to_bytes(length, "little")
|
||||
|
||||
|
||||
class Widget:
|
||||
"""
|
||||
Class managing the communication with the gadget
|
||||
"""
|
||||
|
||||
def __init__(self, serial_device_path: Optional[str] = None) -> None:
|
||||
self._dmx = bytes(512)
|
||||
self.break_time: Optional[int] = None
|
||||
self.mab_time: Optional[int] = None
|
||||
self.rate: Optional[int] = None
|
||||
self.firmware: tuple[Optional[int], Optional[int]] = (None, None)
|
||||
self._auto_commit = True
|
||||
|
||||
self.dmx_staging = bytearray(512)
|
||||
self.serial_device_path = serial_device_path
|
||||
self._serial = serial.Serial(self.serial_device_path, 115200)
|
||||
if self._serial.is_open:
|
||||
self._init_widget()
|
||||
|
||||
@property
|
||||
def auto_commit(self) -> bool:
|
||||
"""
|
||||
Get `auto_commit` value, `True` if auto commit is enabled
|
||||
"""
|
||||
return self._auto_commit
|
||||
|
||||
@auto_commit.setter
|
||||
def auto_commit(self, value: bool):
|
||||
"""
|
||||
Set `auto_commit` value
|
||||
"""
|
||||
self._auto_commit = value
|
||||
if self._auto_commit:
|
||||
self.commit()
|
||||
|
||||
@property
|
||||
def is_open(self) -> bool:
|
||||
return self._serial.is_open
|
||||
|
||||
def __enter__(self):
|
||||
self._serial = serial.Serial(self.serial_device_path, 115200)
|
||||
self._serial = self._serial.__enter__()
|
||||
if self._serial.is_open:
|
||||
self._init_widget()
|
||||
return self
|
||||
|
||||
def _init_widget(self):
|
||||
# Retrieve widget parameters
|
||||
self.send_message(MsgTypes.GET_WIDGET_PARAMS, b"\x00\x00")
|
||||
msg_type, data, _ = None, None, None
|
||||
while msg_type != MsgTypes.GET_WIDGET_PARAMS.value:
|
||||
msg_type, data, _ = self.read_message()
|
||||
assert msg_type is not None and data is not None
|
||||
|
||||
self.break_time = data[2]
|
||||
self.mab_time = data[3]
|
||||
self.rate = data[4]
|
||||
self.firmware = (data[1], data[0])
|
||||
self.send_message(MsgTypes.SEND_DMX, b"\x00" + self._dmx)
|
||||
logging.info("[WIDGET] Initialized widget.")
|
||||
logging.info(f"[WIDGET] Break time: {self.break_time * 10.67}µs")
|
||||
logging.info(f"[WIDGET] MAB time: {self.mab_time * 10.67}µs")
|
||||
logging.info(f"[WIDGET] DMX rate: {self.rate}Hz")
|
||||
logging.info(f"[WIDGET] Firmware v{self.firmware[0]}.{self.firmware[1]}")
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self._serial.__exit__(exc_type, exc_value, traceback)
|
||||
|
||||
def set_params(
|
||||
self,
|
||||
break_time: Optional[int] = None,
|
||||
mab_time: Optional[int] = None,
|
||||
rate: Optional[int] = None,
|
||||
) -> None:
|
||||
"""
|
||||
Set widget params
|
||||
"""
|
||||
if break_time is not None:
|
||||
if break_time >= 128 or break_time < 9:
|
||||
raise ValueError(
|
||||
f"Break time must be in [9,128) range, provided {break_time}"
|
||||
)
|
||||
self.break_time = break_time
|
||||
if mab_time is not None:
|
||||
if mab_time >= 128 or mab_time < 1:
|
||||
raise ValueError(
|
||||
f"MAB time must be in [1,128) range, provided {mab_time}"
|
||||
)
|
||||
self.mab_time = mab_time
|
||||
if rate is not None:
|
||||
if rate >= 41 or rate < 0:
|
||||
raise ValueError(f"DMX rate must be in [0,41) range, provided {rate}")
|
||||
self.rate = rate
|
||||
assert (
|
||||
self.break_time is not None
|
||||
and self.mab_time is not None
|
||||
and self.rate is not None
|
||||
)
|
||||
data = (
|
||||
b"\x00\x00"
|
||||
+ to_bytes(self.break_time)
|
||||
+ to_bytes(self.mab_time)
|
||||
+ to_bytes(self.rate)
|
||||
)
|
||||
self.send_message(MsgTypes.SET_WIDGET_PARAMS, data)
|
||||
|
||||
def set_dmx(
|
||||
self,
|
||||
addr: int,
|
||||
data: bytes,
|
||||
addr_starting: int = 0,
|
||||
) -> None:
|
||||
"""
|
||||
Set dmx values that will be sent.
|
||||
|
||||
If `auto_commit` is set to false, to provided configuration won't be
|
||||
sent but memorized until the next call to `Widget.commit()` (or a call
|
||||
to this function with `auto_commit = True`).
|
||||
"""
|
||||
true_addr = addr - addr_starting
|
||||
if len(data) + true_addr > 512:
|
||||
raise ValueError("Can't send more than 512 value over dmx")
|
||||
self.dmx_staging[true_addr : true_addr + len(data)] = data
|
||||
if self.auto_commit:
|
||||
self.commit()
|
||||
|
||||
def commit(self) -> None:
|
||||
"""Commit the staging DMX data and send it"""
|
||||
self._dmx = bytes(self.dmx_staging)
|
||||
self.send_message(MsgTypes.SEND_DMX, b"\x00" + self._dmx)
|
||||
|
||||
def send_message(self, msg_type: MsgTypes, data: bytes) -> None:
|
||||
"""
|
||||
Lower level API for sending instruction to widget.
|
||||
Prefer the use of `Widget.set_dmx` or `Widget.set_params`.
|
||||
"""
|
||||
if msg_type not in MsgTypes:
|
||||
raise KeyError("Message type not known")
|
||||
datalen = to_bytes(len(data), 2)
|
||||
if not self._serial.is_open:
|
||||
raise ValueError("Serial port is closed")
|
||||
message = b"\x7E" + msg_type.value + datalen + data + b"\xE7"
|
||||
logging.debug(f"[SERIAL] Sending {message}")
|
||||
self._serial.write(message)
|
||||
# self._serial.flush()
|
||||
|
||||
def read_message(self) -> tuple[bytes, bytes, bool]:
|
||||
"""
|
||||
Low level API for reading message sent by the widget.
|
||||
"""
|
||||
current_byte = b"\x00"
|
||||
# Flush until start byte
|
||||
while current_byte != b"\x7E":
|
||||
current_byte = self._serial.read(1)
|
||||
logging.debug(f"[SERIAL] Read {current_byte}")
|
||||
msg_type = self._serial.read(1)
|
||||
logging.debug(f"[SERIAL] Read {msg_type}")
|
||||
data_len_raw = self._serial.read(2)
|
||||
logging.debug(f"[SERIAL] Read {data_len_raw}")
|
||||
data_len = from_bytes(data_len_raw)
|
||||
data = self._serial.read(data_len)
|
||||
logging.debug(f"[SERIAL] Read {data}")
|
||||
end_byte = self._serial.read(1)
|
||||
logging.debug(f"[SERIAL] Read {end_byte}")
|
||||
is_correct = end_byte == b"\xE7"
|
||||
if not is_correct:
|
||||
logging.warning("[SERIAL] Invalid message received")
|
||||
return msg_type, data, is_correct
|
23
scripts/debug.py
Normal file
23
scripts/debug.py
Normal file
|
@ -0,0 +1,23 @@
|
|||
"""
|
||||
A script to debug dmx lines
|
||||
"""
|
||||
import logging
|
||||
|
||||
from pyjecteur.widget import MsgTypes, Widget
|
||||
|
||||
WINDOW_MIN = 130
|
||||
WINDOW_MAX = 150
|
||||
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logging.info("Starting")
|
||||
|
||||
w = Widget("/dev/ttyUSB0")
|
||||
|
||||
w.send_message(MsgTypes.RECEIVE_DMX_ON_CHANGE, b"\x00")
|
||||
|
||||
while True:
|
||||
m = w.read_message()
|
||||
if m[0] != MsgTypes.RECEIVED_DMX.value:
|
||||
logging.info("Not DMX")
|
||||
continue
|
||||
logging.info(f"DMX: {m[1][WINDOW_MIN:WINDOW_MAX]}")
|
11
setup.py
Normal file
11
setup.py
Normal file
|
@ -0,0 +1,11 @@
|
|||
from setuptools import setup
|
||||
|
||||
setup(
|
||||
name="pyjecteur",
|
||||
version="3.0",
|
||||
packages=["pyjecteur"],
|
||||
install_requires=[
|
||||
"pyserial",
|
||||
"colour",
|
||||
],
|
||||
)
|
40
shell.nix
Normal file
40
shell.nix
Normal file
|
@ -0,0 +1,40 @@
|
|||
{
|
||||
nixpkgs ? (import ./npins).nixpkgs
|
||||
, pkgs ? import nixpkgs {}
|
||||
}:
|
||||
let
|
||||
python = pkgs.python3.override {
|
||||
packageOverrides = self: super: {
|
||||
pyjecteur = self.callPackage ./pyjecteur.nix {};
|
||||
};
|
||||
};
|
||||
in
|
||||
{
|
||||
dev = pkgs.mkShell {
|
||||
packages = [
|
||||
(pkgs.python310.withPackages (ps: [
|
||||
# build dep
|
||||
ps.pyserial
|
||||
ps.colour
|
||||
|
||||
# dev dep
|
||||
ps.black
|
||||
ps.pylint
|
||||
ps.ipython
|
||||
]))
|
||||
pkgs.pyright
|
||||
];
|
||||
};
|
||||
prod = pkgs.mkShell {
|
||||
packages = [
|
||||
(python.withPackages (ps: [
|
||||
ps.pyjecteur
|
||||
|
||||
ps.ipython
|
||||
ps.black
|
||||
ps.pylint
|
||||
]))
|
||||
pkgs.pyright
|
||||
];
|
||||
};
|
||||
}
|
Loading…
Reference in a new issue