#!/usr/bin/env python3 import pyvisa import time import pyaudio as pa import numpy as np import scipy as sc import matplotlib.pyplot as plt current_phase = 0.0 normalized_f = 0.0 # in cycles per sample : f = normalized_f * f_sampling trans_len = 2000 intro = 0.5 * (1 - np.cos(2 * np.linspace(0, np.pi/2, trans_len))) outro = 0.5 * (1 + np.cos(2 * np.linspace(0, np.pi/2, trans_len))) intro = np.append(intro, np.ones(1024)) outro = np.append(outro, np.zeros(1024)) change_f = True trans_done = False new_f = 0.0 fade_time = 0 def audio_callback(in_data, frame_count, time_info, status): global fade_time global change_f global new_f global current_phase global normalized_f global trans_done if change_f == True: change_f = False trans_done = False fade_time = 0 if(fade_time != 2*trans_len): if(fade_time < trans_len): last_fade = fade_time + frame_count envelope = outro[fade_time:last_fade] fade_time = min(last_fade, trans_len) elif(fade_time >= trans_len and fade_time < 2*trans_len): normalized_f = new_f if(new_f == 0): current_phase = 0. # Avoid weird sub-audio behavior last_fade = fade_time + frame_count envelope = intro[fade_time - trans_len:last_fade - trans_len] fade_time = min(last_fade, 2*trans_len) if(fade_time == 2*trans_len): trans_done = True sine = np.sin(2 * np.pi * (current_phase + np.arange(frame_count)*normalized_f)) delta = frame_count*normalized_f delta -= np.floor(delta) if(delta + current_phase > 1): current_phase += delta - 1 else: current_phase += delta if(fade_time != 2*trans_len): sine *= envelope return (sine.astype(np.float32).tobytes(), pa.paContinue) paudio = pa.PyAudio() audio_strm = paudio.open(format=pa.paFloat32, channels=1, rate=44100, output=True, input=False, stream_callback=audio_callback, start=True) print(paudio.get_default_output_device_info()) print(audio_strm.is_active()) def acquire_samples(sc_frequency, oscillo): global new_f global change_f global trans_done new_f = sc_frequency change_f = True while (change_f == True) or (trans_done == False): time.sleep(0.05) time.sleep(0.2) oscillo.write("acquire:state run") oscillo.write("data:source CH1") input_ch = oscillo.query_binary_values("curve?",datatype="b",container=np.array).astype(np.float64) oscillo.write("data:source CH2") output_ch = oscillo.query_binary_values("curve?",datatype="b",container=np.array).astype(np.float64) return input_ch, output_ch def manage_measures(f_list, oscillo): # These are NOT normalized frequencies hscales = [0.25, 100e-3, 50e-3, 25e-3, 10e-3, 5e-3, 2.5e-3, 1e-3, 500e-6, 250e-6, 100e-6, 50e-6, 25e-6, 10e-6] hscales_str = ["2.5e-1", "1.0e-1", "5.0e-2", "2.5e-2", "1.0e-2", "5.0e-3", "2.5e-3", "1.0e-3", "5.0e-4", "2.5e-4", "1.0e-4", "5.0e-5", "2.5e-5", "1.0e-5"] hscale_index = 0 current_hscale = hscales[hscale_index] cal_A = [] cal_B = [] cal_done = False H = [] W = sc.signal.windows.flattop(2500) for f in f_list: sc_frequency = np.round(2**20/44100 * f)/2**20 num_periods = f * 10 * current_hscale while(num_periods >= 5 * 2.5): hscale_index += 1 current_hscale = hscales[hscale_index] num_periods = f * 10 * current_hscale print("horizontal:main:scale " + hscales_str[hscale_index]) oscillo.write("horizontal:main:scale " + hscales_str[hscale_index]) cal_done = False print(f, " : ", current_hscale,"s/div, ", num_periods, " periods") A, B = acquire_samples(sc_frequency, oscillo) if not cal_done: cal_A, cal_B = acquire_samples(0., oscillo) cal_done = True print(A) print(B) A, B = (A - cal_A)*W, (B - cal_B)*W f_acq = 250./current_hscale rel_f = f / f_acq # Obtain the Fourier transforms of A, B @ rel_f, with flattop windowing V = np.exp(-2.0j * np.pi * np.arange(2500) * rel_f) h = np.sum(B*V)/np.sum(A*V) print("H(", f, ") = ", h) H.append(h) return H def find_oscillo(): rm = pyvisa.ResourceManager() R = rm.list_resources() print(R) # if R.empty(): # print("No device found") # exit(1) return rm.open_resource(R[0]) osci = find_oscillo() osci.timeout = 10000 print(osci.query("*IDN?")) osci.write("*RST") time.sleep(10) osci.write("ch1:scale 2.5e-1") osci.write("ch1:bandwidth ON") osci.write("ch1:coupling AC") osci.write("select:ch1 1") osci.write("ch2:scale 2.5e-1") osci.write("ch2:bandwidth ON") osci.write("ch2:coupling AC") osci.write("select:ch2 1") osci.write("trigger:main:edge:source LINE") osci.write("trigger:main:mode NORMAL") osci.write("trigger:main:type EDGE") osci.write("acquire:mode sample") osci.write("acquire:stopafter sequence") osci.write("data:encdg RIBinary") decade = [1.0, 1.2, 1.5, 1.8, 2.2, 2.7, 3.3, 3.9, 4.7, 5.6, 6.8, 8.2] X = [68., 82] + list(np.array(decade)*([100.]*len(decade))) + list(np.array(decade) * ([1000.]*len(decade))) + [1e4, 1.2e4, 1.5e4, 1.8e4] H = manage_measures(X, osci) Y = 20 * np.log10(np.abs(H)) fig, axes = plt.subplots(nrows = 2, ncols=1, sharex = True) axes[0].semilogx(X,Y) axes[0].grid() axes[1].semilogx(X, np.angle(H, deg=True)) axes[1].grid() plt.show()