import socket import os import typing import time import numpy as np import matplotlib.pyplot as plt from matplotlib.colors import LinearSegmentedColormap def parse_speed(text): tl = text.split(' ') return int(tl[0]) * float(tl[1]) class Simulator: # X_STEPS_PER_INCH = 4800 X_STEPS_PER_MM: float = 188.97 X_MOTOR_STEPS: float = 200 # Y_STEPS_PER_INCH = 4800 Y_STEPS_PER_MM: int = 188.97 Y_MOTOR_STEPS: int = 200 # Z_STEPS_PER_INCH = 4800 Z_STEPS_PER_MM: float = 188.97 Z_MOTOR_STEPS: int = 200 pos = np.array([0., 0., 0.]) speed = np.array([0., 0., 0.]) def __init__(self, bind): self.s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: os.remove(bind) except FileNotFoundError: pass self.s.bind(bind) self.s.listen() while True: try: self.loop() except KeyboardInterrupt: # Just so everything is fine when quitting self.s.close() plt.ioff() try: os.remove(bind) except FileNotFoundError: pass def loop(self): req = socket.SocketIO(self.s, 'r').readline() match req: case 'request': print("At your service") self.request() # These are the functions that need to be implemented case 'realize': print("Contemplating Life") self.realize() print("Job Done") def realize(self): pass def request(self): pass class Douche: colors = [(125, 29, 211), (255, 229, 0)] cm = LinearSegmentedColormap.from_list( "Custom", colors, N=42 ) last_x = None last_y = None def __init__(self): # We will assume no points are `really` in 3D. plt.ion() fig = plt.figure() self.ax = fig.gca() self.xs = [] self.ys = [] self.line, = self.ax.plot(self.xs, self.ys, cmap=self.cm) def add_point(self, x, y): self.xs.append(x) self.ys.append(y) self.line.set_data(self.xs, self.ys) plt.draw() class NaiveSimulator(Simulator, Douche): steps = [0, 0, 0] time_step = 0 last_update = 0. alpha = 1. def __init__(self, bind): self.last_update = time.time() # Custom Simulation Douche.__init__(self) # Prepare to show the simulation self.add_point(self.pos[0], self.pos[1]) Simulator.__init__(self, bind) # Simulator loop def request(self): self.s.send(f"{self.steps[0]}".encode()) self.s.send(f"{self.steps[1]}".encode()) self.s.send(f"{self.steps[2]}".encode()) self.add_point(self.pos[0], self.pos[1]) def realize(self): print("I JUST REALIZED: MA VIE C'EST DE LA MERDE") lt = time.time() self.time_step = lt - self.last_update self.last_update = lt x_v = parse_speed(socket.SocketIO(self.s, 'r').readline()) y_v = parse_speed(socket.SocketIO(self.s, 'r').readline()) z_v = parse_speed(socket.SocketIO(self.s, 'r').readline()) inertia = - self.alpha * self.speed gamma_command = np.array([x_v, y_v, z_v]) - self.speed self.speed = self.time_step * (inertia + gamma_command) step_increment = np.ceil(self.speed * self.time_step * np.array([1 / self.X_MOTOR_STEPS, 1 / self.Y_MOTOR_STEPS, 1 / self.Z_MOTOR_STEPS])) self.steps = self.steps + step_increment self.pos = self.pos + np.array([self.X_STEPS_PER_MM * step_increment[0], self.Y_STEPS_PER_MM * step_increment[1], self.Z_STEPS_PER_MM * step_increment[2]]) self.s.send("Realized".encode())