Files
frappy/frappy_psi/network_analysers/ZVL/ZVLDriver.py

204 lines
7.1 KiB
Python

# NWA Mode
# INST:SEL NWA|SAN"...
#NEED to enable LXI on net analyzer. DHCP needs to be on
import vxi11
import numpy as np
import traceback
import time
class ZVLNetAnalyzer():
def __init__(self, ip=None):
if(ip is None or ip == ''):
if(len(vxi11.list_devices()) == 1):
ip = vxi11.list_devices()[0]
print('ZVL NA: Selecting ip='+ip)
else:
print('ZVL NA: Please provide an ip from the following:')
print(vxi11.list_devices())
raise Exception
self.instrument = vxi11.Instrument(ip)
self.instrument.open()
idn = self.instrument.ask('*IDN?')
print(f'Identity: {idn}')
self.reset()
self.base_data = np.array([])
self.base_data = self.get_data()[1]
def reset(self):
#self.instrument.write('*RST')
#self.instrument.write('SYST:PRES') # reloads current setup.
self.instrument.write('*CLS')
self.instrument.write('INST:NSEL 2')
self.instrument.write('DISPlay:WINDow1:STATe ON')
self.instrument.write(":CALC:PAR:MEAS 'Trc1', 'S11'")
self.instrument.write('CALC:FORM MLOG')
self.instrument.write('INIT:CONT OFF')
self.instrument.write("SYST:USER:DISP:TITL 'Frappy connection'")
#self.instrument.write('INIT:SCOP OFF')
#self.instrument.write('DISPlay:WINDow2:STATe ON')
def load_calibration(self, f):
self.instrument.write(f":MMEMORY:STORE:CORR 1, 'OSM1 {f}'") # put calibration in pool
self.instrument.write(f":MMEMORY:LOAD:CORR 1, 'OSM1 {f}'") # load from pool
def set_freq_range(self, start, stop):
'''In Hz'''
self.instrument.write(f'SENS1:FREQ:STAR {start}')
self.instrument.write(f'SENS1:FREQ:STOP {stop}')
def set_freq_span(self, center, span):
'''In Hz'''
self.instrument.write(f'SENS1:FREQ:CENT {center}')
self.instrument.write(f'SENS1:FREQ:SPAN {span}')
def set_averaging_passes(self,avgs):
'''
Parameters
'''
assert(avgs <= 1000)
assert(avgs >= 1)
self.instrument.write('AVER:CLE')
self.instrument.write(f'AVER:COUN {avgs}')
self.instrument.write('AVER ON')
time.sleep(1)
def clear(self):
self.instrument.write('AVER:CLE')
self.instrument.write('TRAC:CLE')
self.instrument.write('CLE')
def format_data(self, returned_from_device, complex=True):
d = returned_from_device.split(',')
if(complex):
d = [ float(r) + 1j*float(i) for r, i in zip(d[::2], d[1::2]) ]
else:
d = [ float(i) for i in d ]
return np.array(d)
def get_data(self, N=1000, units='dB', averaging_passes=1):
'''Returns the frequencies, in Hz, and the magnitudes of S11, in units (see `units` parameter). Note: Testing shows that acquisition time is approx. 0.7ms (mostly linear) per datapoint. Max 1000*averaging_passes datapoints (700ms per averaging pass, really).
Valid units are:
dB: deciBels (power)
unitless: simply magnitudes of S11
Parameters
N: int, between 2 and 1000 inc., the number of points to be returned. (default 1000)
units: str, see above. Units/format of returned data.
averaging_passes: int, describes the number of scans that will be taken and averaged. values can be from 1-999 inclusive (default 1)
'''
assert(N>=2)
assert(N<=1000)
assert(averaging_passes>=1)
assert(averaging_passes<=999)
assert(units in ['dB', 'unitless'])
self.instrument.write(f'SWE:POIN {N}')
self.instrument.write(f'SWE:COUN {averaging_passes}')
if(N != self.base_data.shape[0]):
data = np.zeros(N, dtype=np.complex128)
else:
data = np.copy(self.base_data)
freqs = np.zeros_like(data)
self.instrument.write('INIT')
while(np.sum(data) == np.sum(self.base_data) or np.all(data == 0)):
total_data = np.zeros_like(data, dtype=np.complex128)
for i in range(1, averaging_passes+1):
data = self.instrument.ask(f'CALC:DATA:NSW? SDAT, {i}')
freqs = self.instrument.ask('CALC:DATA:STIM?')
data = self.format_data(data)
freqs = self.format_data(freqs, complex=False)
try:
total_data += data
except:
total_data = data
total_data /= averaging_passes
self.base_data = data
if(units == 'dB'):
# |total_data| is amplitudes. dB should be in terms of power
total_data = np.log10(np.square(np.abs(total_data)))*10.0
elif(units == 'unitless'):
total_data = np.abs(total_data)
return freqs, total_data
def find_peak(self, range_start=None, range_end=None, scan_width=100_000):
if(range_start is None):
range_start = 9_000
if(range_end is None):
range_end = 13_600_000_000
min_mag = 0
min_mag_index = 0
N=1000
num_scans = (range_end - range_start)//scan_width + 1
full_record = np.zeros(N*num_scans)
full_record_fq = np.zeros(N*num_scans)
i = 1
s = range_start
e = s + scan_width
while(i <= num_scans):
s = range_start + i*scan_width
e = s + scan_width
self.set_freq_range(s, e)
freqs, data = self.get_data()
mn = np.min(data)
if(mn < min_mag):
min_mag = mn
min_mag_index = np.argmin(data) + (i-1)*N
full_record[(i-1)*N:i*N] = data
full_record_fq[(i-1)*N:i*N] = freqs
i += 1
return min_mag, min_mag_index, full_record, full_record_fq
# example code. profiles the per-point delay for reading data and
#ip = '169.254.150.182'
#import matplotlib.pyplot as plt
#print('start')
#z = ZVLNetAnalyzer(ip)
#mm, mmi, fr, frq = z.find_peak(50_000_000, 350_000_000, 20_000_000)
#plt.plot(frq, fr)
#plt.axvline(frq[mmi])
#plt.axhline(mm)
#plt.show()
#z.reset()
##z.set_freq_range(1_000_000, 2_000_000.5)
##z.set_freq_span(1_000_000, 10_000)
#z.set_freq_span(220_000_000, 50_000_000)
#Ns = np.linspace(3, 1000, 100).astype(int)
#ts = []
#for N in Ns:
# st = time.time()
# freqs, data = z.get_data(N)
# et = time.time()
# dt = (et-st)
# print(f'got data, {dt/N} ({dt})')
# ts += [dt]
#
#plt.scatter(Ns, ts)
#plt.show()
#plt.scatter(Ns, np.array(ts)/np.array(Ns))
#plt.show()
#input()
#plt.plot(*z.get_data(averaging_passes=1), alpha=0.3)
#plt.plot(*z.get_data(averaging_passes=64), alpha=0.3)
#plt.show()