267 lines
7.4 KiB
Python
Executable file
267 lines
7.4 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import pyvisa
|
|
import time
|
|
|
|
import pyaudio as pa
|
|
import numpy as np
|
|
import scipy as sc
|
|
import matplotlib.pyplot as plt
|
|
|
|
current_phase = 0.0
|
|
normalized_f = 0.0 # in cycles per sample : f = normalized_f * f_sampling
|
|
|
|
trans_len = 2000
|
|
intro = 0.5 * (1 - np.cos(2 * np.linspace(0, np.pi/2, trans_len)))
|
|
outro = 0.5 * (1 + np.cos(2 * np.linspace(0, np.pi/2, trans_len)))
|
|
|
|
intro = np.append(intro, np.ones(1024))
|
|
outro = np.append(outro, np.zeros(1024))
|
|
|
|
change_f = True
|
|
trans_done = False
|
|
new_f = 0.0
|
|
fade_time = 0
|
|
|
|
|
|
def audio_callback(in_data, frame_count, time_info, status):
|
|
global fade_time
|
|
global change_f
|
|
global new_f
|
|
global current_phase
|
|
global normalized_f
|
|
global trans_done
|
|
|
|
if change_f == True:
|
|
change_f = False
|
|
trans_done = False
|
|
fade_time = 0
|
|
|
|
if(fade_time != 2*trans_len):
|
|
if(fade_time < trans_len):
|
|
last_fade = fade_time + frame_count
|
|
envelope = outro[fade_time:last_fade]
|
|
fade_time = min(last_fade, trans_len)
|
|
elif(fade_time >= trans_len and fade_time < 2*trans_len):
|
|
normalized_f = new_f
|
|
if(new_f == 0):
|
|
current_phase = 0. # Avoid weird sub-audio behavior
|
|
last_fade = fade_time + frame_count
|
|
envelope = intro[fade_time - trans_len:last_fade - trans_len]
|
|
fade_time = min(last_fade, 2*trans_len)
|
|
|
|
if(fade_time == 2*trans_len):
|
|
trans_done = True
|
|
|
|
sine = np.sin(2 * np.pi * (current_phase + np.arange(frame_count)*normalized_f))
|
|
delta = frame_count*normalized_f
|
|
delta -= np.floor(delta)
|
|
|
|
if(delta + current_phase > 1):
|
|
current_phase += delta - 1
|
|
else:
|
|
current_phase += delta
|
|
|
|
if(fade_time != 2*trans_len):
|
|
sine *= envelope
|
|
|
|
return (sine.astype(np.float32).tobytes(), pa.paContinue)
|
|
|
|
paudio = pa.PyAudio()
|
|
|
|
audio_strm = paudio.open(format=pa.paFloat32,
|
|
channels=1,
|
|
rate=44100,
|
|
output=True,
|
|
input=False,
|
|
stream_callback=audio_callback,
|
|
start=True)
|
|
print(paudio.get_default_output_device_info())
|
|
print(audio_strm.is_active())
|
|
|
|
|
|
|
|
def acquire_samples(sc_frequency, oscillo):
|
|
global new_f
|
|
global change_f
|
|
global trans_done
|
|
|
|
new_f = sc_frequency
|
|
change_f = True
|
|
while (change_f == True) or (trans_done == False):
|
|
time.sleep(0.01)
|
|
time.sleep(0.1)
|
|
|
|
oscillo.write("acquire:state run")
|
|
oscillo.write("data:source CH1")
|
|
input_ch = oscillo.query_binary_values("curve?",datatype="b",container=np.array).astype(np.float64)
|
|
oscillo.write("data:source CH2")
|
|
output_ch = oscillo.query_binary_values("curve?",datatype="b",container=np.array).astype(np.float64)
|
|
|
|
return input_ch, output_ch
|
|
|
|
def manage_measures(f_list, oscillo): # These are NOT normalized frequencies
|
|
hscales = [0.25, 100e-3, 50e-3, 25e-3, 10e-3, 5e-3, 2.5e-3, 1e-3, 500e-6, 250e-6, 100e-6, 50e-6, 25e-6, 10e-6]
|
|
hscales_str = ["2.5e-1", "1.0e-1", "5.0e-2", "2.5e-2", "1.0e-2", "5.0e-3", "2.5e-3", "1.0e-3", "5.0e-4", "2.5e-4", "1.0e-4", "5.0e-5", "2.5e-5", "1.0e-5"]
|
|
|
|
hscale_index = 0
|
|
current_hscale = hscales[hscale_index]
|
|
|
|
|
|
vscales = [0.25, 100e-3, 50e-3, 25e-3]
|
|
vscales_str = ["2.5e-1", "1.0e-1", "5.0e-2", "2.5e-2"]
|
|
vscale_index = 0
|
|
current_vscale = vscales[vscale_index]
|
|
|
|
cal_A = []
|
|
cal_B = []
|
|
cal_done = False
|
|
|
|
H = []
|
|
|
|
W = sc.signal.windows.flattop(2500)
|
|
|
|
for f in f_list:
|
|
sc_frequency = np.round(2**20/44100 * f)/2**20
|
|
num_periods = f * 10 * current_hscale
|
|
update_hscale = False
|
|
while(num_periods >= 5 * 2.5):
|
|
hscale_index += 1
|
|
current_hscale = hscales[hscale_index]
|
|
num_periods = f * 10 * current_hscale
|
|
update_hscale = True
|
|
|
|
if update_hscale:
|
|
print("horizontal:main:scale " + hscales_str[hscale_index])
|
|
oscillo.write("horizontal:main:scale " + hscales_str[hscale_index])
|
|
cal_done = False
|
|
|
|
print(f, " : ", current_hscale,"s/div, ", num_periods, " periods")
|
|
A, B = acquire_samples(sc_frequency, oscillo)
|
|
|
|
is_sat, is_under = False, True
|
|
for x in B:
|
|
is_sat |= (abs(x) >= 127)
|
|
is_under &= ~(abs(x) >= 64)
|
|
|
|
while((is_sat and vscale_index != 0) or (is_under and vscale_index != len(vscales)-1)):
|
|
if(is_sat):
|
|
print("We have saturation")
|
|
vscale_index-=1
|
|
elif(is_under):
|
|
print("We are wasting bits")
|
|
vscale_index+=1
|
|
current_vscale = vscales[vscale_index]
|
|
osci.write("ch2:scale " + vscales_str[vscale_index])
|
|
cal_done = False
|
|
A, B = acquire_samples(sc_frequency, oscillo)
|
|
|
|
is_sat, is_under = False, True
|
|
for x in B:
|
|
is_sat |= (abs(x) >= 120)
|
|
is_under &= ~(abs(x) >= 64)
|
|
|
|
|
|
if not cal_done:
|
|
cal_A, cal_B = acquire_samples(0., oscillo)
|
|
cal_done = True
|
|
print(A)
|
|
print(B)
|
|
A, B = (A - cal_A)*W, (B - cal_B)*W
|
|
f_acq = 250./current_hscale
|
|
rel_f = f / f_acq
|
|
|
|
B *= current_vscale/vscales[0]
|
|
|
|
# Obtain the Fourier transforms of A, B @ rel_f, with flattop windowing
|
|
V = np.exp(-2.0j * np.pi * np.arange(2500) * rel_f)
|
|
h = np.sum(B*V)/np.sum(A*V)
|
|
print("H(", f, ") = ", h)
|
|
H.append(h)
|
|
return H
|
|
|
|
def find_oscillo():
|
|
rm = pyvisa.ResourceManager()
|
|
R = rm.list_resources()
|
|
print(R)
|
|
# if R.empty():
|
|
# print("No device found")
|
|
# exit(1)
|
|
return rm.open_resource(R[0])
|
|
|
|
|
|
osci = find_oscillo()
|
|
osci.timeout = 10000
|
|
print(osci.query("*IDN?"))
|
|
|
|
osci.write("*RST")
|
|
time.sleep(10)
|
|
|
|
osci.write("ch1:scale 2.5e-1")
|
|
osci.write("ch1:bandwidth ON")
|
|
osci.write("ch1:coupling AC")
|
|
osci.write("select:ch1 1")
|
|
|
|
osci.write("ch2:scale 2.5e-1")
|
|
osci.write("ch2:bandwidth ON")
|
|
osci.write("ch2:coupling AC")
|
|
osci.write("select:ch2 1")
|
|
|
|
|
|
osci.write("trigger:main:edge:source LINE")
|
|
osci.write("trigger:main:mode NORMAL")
|
|
osci.write("trigger:main:type EDGE")
|
|
|
|
osci.write("acquire:mode sample")
|
|
osci.write("acquire:stopafter sequence")
|
|
osci.write("data:encdg RIBinary")
|
|
|
|
|
|
def build_spoints(decade):
|
|
res = []
|
|
power_of_ten = 10
|
|
while(power_of_ten <= 100000):
|
|
for x in decade:
|
|
f = x * power_of_ten
|
|
if(60. <= f and f <= 20000.):
|
|
res.append(f)
|
|
power_of_ten *= 10
|
|
return res
|
|
|
|
|
|
e12 = [1.0, 1.2, 1.5, 1.8, 2.2, 2.7, 3.3, 3.9, 4.7, 5.6, 6.8, 8.2]
|
|
e24 = [1.0, 1.1, 1.2, 1.3, 1.5, 1.6, 1.8, 2.0, 2.2, 2.4, 2.7, 3.0, 3.3, 3.6, 3.9, 4.3, 4.7, 5.1, 5.6, 6.2, 6.8, 7.5, 8.2, 9.1]
|
|
e48 = [1.00, 1.05, 1.10, 1.15, 1.21, 1.27, 1.33, 1.40, 1.47, 1.54, 1.62, 1.69, 1.78, 1.87, 1.96, 2.05, 2.15, 2.26, 2.37, 2.49, 2.61, 2.74, 2.87, 3.01, 3.16, 3.32, 3.48, 3.65, 3.83, 4.02, 4.22, 4.42, 4.64, 4.87, 5.11, 5.36, 5.62, 5.90, 6.19, 6.49, 6.81, 7.15, 7.50, 7.87, 8.25, 8.66, 9.09, 9.53]
|
|
|
|
X = build_spoints(e24)
|
|
|
|
print(X)
|
|
|
|
H = manage_measures(X, osci)
|
|
Y = 20 * np.log10(np.abs(H))
|
|
|
|
phase = np.angle(H, deg=True)
|
|
for i in range(1, len(phase)):
|
|
while(abs(phase[i] - phase[i - 1]) >= 180):
|
|
if(phase[i] >= phase[i-1]):
|
|
phase[i] -= 360
|
|
else:
|
|
phase[i] += 360
|
|
|
|
print(X)
|
|
print(H)
|
|
|
|
print("Corrected phase/magnitude :")
|
|
print(phase)
|
|
print(Y)
|
|
|
|
fig, axes = plt.subplots(nrows = 2, ncols=1, sharex = True)
|
|
axes[0].semilogx(X,Y)
|
|
axes[0].grid()
|
|
axes[1].semilogx(X, phase)
|
|
axes[1].grid()
|
|
plt.show(block = False)
|
|
|
|
plt.figure(2)
|
|
plt.plot(X, phase)
|
|
plt.show()
|