VIANC/xfer_tracer/script.py

176 lines
4.2 KiB
Python
Raw Normal View History

2024-12-15 19:57:42 +01:00
#!/usr/bin/env python3
import pyvisa
import time
import pyaudio as pa
import numpy as np
2024-12-17 17:55:11 +01:00
import scipy as sc
current_phase = 0.0
normalized_f = 0.0 # in cycles per sample : f = normalized_f * f_sampling
trans_len = 2000
intro = 0.5 * (1 - np.cos(2 * np.linspace(0, np.pi/2, trans_len)))
outro = 0.5 * (1 + np.cos(2 * np.linspace(0, np.pi/2, trans_len)))
intro = np.append(intro, np.ones(1024))
outro = np.append(outro, np.zeros(1024))
change_f = True
trans_done = False
new_f = 0.0
fade_time = 0
2024-12-15 19:57:42 +01:00
def audio_callback(in_data, frame_count, time_info, status):
2024-12-17 17:55:11 +01:00
global fade_time
global change_f
global new_f
global current_phase
global normalized_f
global trans_done
if change_f == True:
change_f = False
trans_done = False
fade_time = 0
if(fade_time != 2*trans_len):
if(fade_time < trans_len):
last_fade = fade_time + frame_count
envelope = outro[fade_time:last_fade]
fade_time = min(last_fade, trans_len)
elif(fade_time >= trans_len and fade_time < 2*trans_len):
normalized_f = new_f
if(new_f == 0):
current_phase = 0. # Avoid weird sub-audio behavior
last_fade = fade_time + frame_count
envelope = intro[fade_time - trans_len:last_fade - trans_len]
fade_time = min(last_fade, 2*trans_len)
if(fade_time == 2*trans_len):
trans_done = True
sine = np.sin(2 * np.pi * (current_phase + np.arange(frame_count)*normalized_f))
delta = frame_count*normalized_f
delta -= np.floor(delta)
if(delta + current_phase > 1):
current_phase += delta - 1
else:
current_phase += delta
if(fade_time != 2*trans_len):
sine *= envelope
return (sine.astype(np.float32).tobytes(), pa.paContinue)
2024-12-15 19:57:42 +01:00
paudio = pa.PyAudio()
2024-12-17 17:55:11 +01:00
audio_strm = paudio.open(format=pa.paFloat32,
2024-12-15 19:57:42 +01:00
channels=1,
rate=48000,
output=True,
2024-12-17 17:55:11 +01:00
input=False,
stream_callback=audio_callback,
start=True)
def acquire_samples(sc_frequency, oscillo):
global new_f
global change_f
global trans_done
new_f = sc_frequency
change_f = True
while (change_f == True) or (trans_done == False):
time.sleep(0.05)
time.sleep(0.2)
oscillo.write("acquire:state run")
oscillo.write("data:source CH1")
input_ch = oscillo.query_binary("curve?", container=np.array)
oscillo.write("data:source CH2")
output_ch = oscillo.query_binary("curve?", container=np.array)
return input_ch, output_ch
#return (input_ch, output_ch)
def manage_measures(f_list, oscillo): # These are NOT normalized frequencies
hscales = [25e-3, 10e-3, 5e-3, 2.5e-3, 1e-3,
500e-6, 250e-6, 100e-6, 50e-6, 25e-6, 10e-6]
hscale_index = 0
current_hscale = hscales[hscale_index]
cal_A = []
cal_B = []
cal_done = False
H = []
W = sc.signal.windows.flattop(2500)
for f in f_list:
sc_frequency = np.round(2**20/48000 * f)/2**20
num_periods = f * 10 * current_hscale
while(num_periods >= 5 * 2.5):
hscale_index += 1
current_hscale = hscales[hscale_index]
num_periods = f * 10 * current_hscale
oscillo.write("horizontal:main:scale " + str(hscales))
cal_done = False
print(f, " : ", current_hscale,"s/div, ", num_periods, " periods")
A, B = acquire_samples(sc_frequency, oscillo)
if not cal_done:
cal_A, cal_B = acquire_samples(0., oscillo)
cal_done = True
A, B = (A - cal_A)*W, (B - cal_B)*W
f_acq = 250./current_hscale
rel_f = f / f_acq
# Obtain the Fourier transforms of A, B @ rel_f, with flattop windowing
V = np.exp(-2.0j * np.pi * np.arange(2500) * rel_f)
h = np.sum(B*V)/np.sum(A.V)
print("H(", f, ") = ", h)
H.append(h)
return H
def find_oscillo():
rm = pyvisa.ResourceManager()
R = rm.list_resources()
print(R)
# if R.empty():
# print("No device found")
# exit(1)
return rm.open_resource(R[0])
osci = find_oscillo()
print(osci.query("*IDN?"))
osci.write("*RST")
time.sleep(5)
osci.write("ch1:scale 1.0e-1")
osci.write("ch1:bandwidth ON")
osci.write("ch1:coupling AC")
osci.write("select:ch1 1")
osci.write("ch2:scale 5.0e-2")
osci.write("ch2:bandwidth ON")
osci.write("ch2:coupling AC")
osci.write("select:ch2 1")
osci.write("trigger:main:edge:source LINE")
osci.write("trigger:main:mode NORMAL")
osci.write("trigger:main:type EDGE")
osci.write("acquire:mode sample")
osci.write("acquire:stopafter sequence")
osci.write("data:encdg ascii")
manage_measures([330., 470., 680., 820., 1000.])
2024-12-15 19:57:42 +01:00