import socket import os import time import numpy as np import matplotlib.pyplot as plt def parse_speed(text): tl = text.split(b' ') return (2 * int(tl[0]) - 1) * float(tl[1]) class Simulator: # X_STEPS_PER_INCH = 4800 X_STEPS_PER_MM: float = 188.97 X_MOTOR_STEPS: float = 200 # Y_STEPS_PER_INCH = 4800 Y_STEPS_PER_MM: int = 188.97 Y_MOTOR_STEPS: int = 200 # Z_STEPS_PER_INCH = 4800 Z_STEPS_PER_MM: float = 188.97 Z_MOTOR_STEPS: int = 200 pos = np.array([0., 0., 0.]) speed = np.array([0., 0., 0.]) def __init__(self, bind): self.s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: os.remove(bind) except FileNotFoundError: pass self.s.bind(bind) self.s.setblocking(False) self.s.listen() while True: try: self.s, _ = self.s.accept() self.s.setblocking(False) break except BlockingIOError: print("Waiting for connection") time.sleep(1) try: self.loop() except KeyboardInterrupt: # Just so everything is fine when quitting self.s.close() plt.ioff() try: os.remove(bind) except FileNotFoundError: pass def loop(self): while True: with socket.SocketIO(self.s, 'r') as buffer: try: time.sleep(1e-5) self.simulate() req = buffer.readline() self.s.setblocking(True) match req: case b'request\n': self.request() # These are the functions that need to be implemented case b'realize\n': self.realize() case b'': break self.s.setblocking(False) except BlockingIOError: pass except OSError: pass def simulate(self): pass def realize(self): pass def request(self): pass class Douche: def __init__(self): # We will assume no points are `really` in 3D. plt.ion() self.fig = plt.figure() ax = self.fig.add_subplot(111) self.xs = [] self.ys = [] ax.set_xlim([0, 300]) ax.set_ylim([0, 300]) self.line, = ax.plot(self.xs, self.ys, c="#7d1dd3") def add_point(self, x, y): self.xs.append(x) self.ys.append(y) self.line.set_data(self.xs, self.ys) self.fig.canvas.draw() self.fig.canvas.flush_events() class NaiveSimulator(Simulator, Douche): steps = [0, 0, 0] time_step = 0 last_update = np.nan # We have on each axis # Jw' = Gamma(motor) - f0*w # which can be expressed as # Jv' = (R*Gamma)(motor) - f0*v J = np.array([.1, .1, .1]) # Moment of inertia vector f = np.array([0., 0., 0.]) # viscous friction coefficient G = np.array([1., 1., 1.]) # Motor gain : Gamma*R = G * delta_v # FIXME: this is not very realistic, this looks like a model of # an asynchronous motor, not a stepper. command_spd = [0., 0., 0.] def __init__(self, bind): self.last_update = time.time() # Custom Simulation Douche.__init__(self) # Prepare to show the simulation self.add_point(self.pos[0], self.pos[1]) Simulator.__init__(self, bind) # Simulator loop def simulate(self): lt = time.time() self.time_step = lt - self.last_update if(self.time_step == np.nan): return self.last_update = lt gamma = self.G*(self.command_spd - self.speed) self.pos += self.speed * self.time_step self.speed += 1./self.J * (gamma - self.f*self.speed) * self.time_step #print("timestep ", self.time_step) #print("gamma ", gamma) #print("speed ", self.speed) print("position ", self.pos) self.steps = np.ceil(self.pos * np.array([self.X_STEPS_PER_MM, self.Y_STEPS_PER_MM, self.Z_STEPS_PER_MM])) self.add_point(self.pos[0], self.pos[1]) def request(self): self.s.send(f"{int(self.steps[0] % self.X_MOTOR_STEPS)}\n".encode()) self.s.send(f"{int(self.steps[1] % self.Y_MOTOR_STEPS)}\n".encode()) self.s.send(f"{int(self.steps[2] % self.Z_MOTOR_STEPS)}\n".encode()) def realize(self): x_v = parse_speed(socket.SocketIO(self.s, 'r').readline()) y_v = parse_speed(socket.SocketIO(self.s, 'r').readline()) z_v = parse_speed(socket.SocketIO(self.s, 'r').readline()) self.command_spd = np.array([x_v, y_v, z_v]) print("New speeds : ", self.command_spd) #self.s.send("Realized\n".encode()) simu = NaiveSimulator("socket.sock")