import socket import os import typing import time import numpy as np import matplotlib.pyplot as plt from matplotlib.colors import LinearSegmentedColormap def parse_speed(text): tl = text.split(b' ') return (2 * int(tl[0]) - 1) * float(tl[1]) class Simulator: # X_STEPS_PER_INCH = 4800 X_STEPS_PER_MM: float = 188.97 X_MOTOR_STEPS: float = 200 # Y_STEPS_PER_INCH = 4800 Y_STEPS_PER_MM: int = 188.97 Y_MOTOR_STEPS: int = 200 # Z_STEPS_PER_INCH = 4800 Z_STEPS_PER_MM: float = 188.97 Z_MOTOR_STEPS: int = 200 pos = np.array([0., 0., 0.]) speed = np.array([0., 0., 0.]) def __init__(self, bind): self.s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: os.remove(bind) except FileNotFoundError: pass self.s.bind(bind) self.s.setblocking(False) self.s.listen() while True: try: self.s, _ = self.s.accept() break except BlockingIOError: print("Waiting for connection") time.sleep(1) try: self.loop() except KeyboardInterrupt: # Just so everything is fine when quitting self.s.close() plt.ioff() try: os.remove(bind) except FileNotFoundError: pass def loop(self): while(True): with socket.SocketIO(self.s, 'r') as buffer: try: time.sleep(1e-5) self.simulate() req = buffer.readline() self.s.setblocking(True) match req: case b'request\n': print("At your service") self.request() # These are the functions that need to be implemented case b'realize\n': print("Contemplating Life") self.realize() case b'': break self.s.setblocking(False) except BlockingIOError: pass except OSError: pass def simulate(self): pass def realize(self): pass def request(self): pass class Douche: colors = [(125, 29, 211), (255, 229, 0)] cm = LinearSegmentedColormap.from_list( "Custom", colors, N=42 ) last_x = None last_y = None def __init__(self): # We will assume no points are `really` in 3D. plt.ion() fig = plt.figure() self.ax = fig.gca() self.xs = [] self.ys = [] self.line, = self.ax.plot(self.xs, self.ys)#, color=self.cm) def add_point(self, x, y): print(x, y) self.xs.append(x) self.ys.append(y) self.line.set_data(self.xs, self.ys) plt.draw() plt.show() class NaiveSimulator(Simulator, Douche): steps = [0, 0, 0] time_step = 0 last_update = 0. J = np.array([1., 1., 1.]) # Moment of inertia vector times 1/R f = np.array([0., 0., 0.]) # viscous friction coefficient G = np.array([1., 1., 1.]) # Motor gain : Gamma = G * delta_w # FIXME: this is not very realistic, this looks like a model of # an asynchronous motor, not a stepper. command_spd = [0., 0., 0.] def __init__(self, bind): self.last_update = time.time() # Custom Simulation Douche.__init__(self) # Prepare to show the simulation self.add_point(self.pos[0], self.pos[1]) Simulator.__init__(self, bind) # Simulator loop def simulate(self): lt = time.time() self.time_step = lt - self.last_update self.last_update = lt gamma = self.G*(self.command_spd - self.speed) self.speed += 1./self.J * (self.G - self.f*self.speed ) * self.time_step self.pos += self.speed * self.time_step self.steps = np.ceil(self.pos * np.array([self.X_STEPS_PER_MM, self.Y_STEPS_PER_MM, self.Z_STEPS_PER_MM])) self.add_point(self.pos[0], self.pos[1]) def request(self): self.s.send(f"{self.steps[0]}".encode()) self.s.send(f"{self.steps[1]}".encode()) self.s.send(f"{self.steps[2]}".encode()) def realize(self): print("I JUST REALIZED: MA VIE C'EST DE LA MERDE") x_v = parse_speed(socket.SocketIO(self.s, 'r').readline()) y_v = parse_speed(socket.SocketIO(self.s, 'r').readline()) z_v = parse_speed(socket.SocketIO(self.s, 'r').readline()) print("Everything parsed") self.command_spd = np.array([x_v, y_v, z_v]) self.s.send("Realized".encode()) simu = NaiveSimulator("socket.sock")