frappy_psi/adq_mr (ultrasound): exit on reboot error message

otherwise the error message is confusing
+ remove CR from line endings in adq_mr.py

Change-Id: Ia465a26803a92677383969ff620ef35e58f1a5ec
This commit is contained in:
zolliker 2025-03-28 14:25:16 +01:00
parent 8560384529
commit 0ef484e082

View File

@ -1,396 +1,397 @@
# ***************************************************************************** # *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under # This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software # the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later # Foundation; either version 2 of the License, or (at your option) any later
# version. # version.
# #
# This program is distributed in the hope that it will be useful, but WITHOUT # This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details. # details.
# #
# You should have received a copy of the GNU General Public License along with # You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc., # this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# #
# Module authors: # Module authors:
# Damaris Tartarotti Maimone # Damaris Tartarotti Maimone
# Markus Zolliker <markus.zolliker@psi.ch> # Markus Zolliker <markus.zolliker@psi.ch>
# ***************************************************************************** # *****************************************************************************
"""Wrapper for the ADQ data acquisition card for ultrasound""" """Wrapper for the ADQ data acquisition card for ultrasound"""
import sys import sys
import atexit import atexit
import signal import signal
import time import time
import numpy as np import numpy as np
import ctypes as ct import ctypes as ct
from scipy.signal import butter, filtfilt from scipy.signal import butter, filtfilt
# For different trigger modes # For different trigger modes
SW_TRIG = 1 SW_TRIG = 1
# The following external trigger does not work if the level of the trigger is very close to 0.5V. # The following external trigger does not work if the level of the trigger is very close to 0.5V.
# Now we have it close to 3V, and it works # Now we have it close to 3V, and it works
EXT_TRIG_1 = 2 EXT_TRIG_1 = 2
EXT_TRIG_2 = 7 EXT_TRIG_2 = 7
EXT_TRIG_3 = 8 EXT_TRIG_3 = 8
LVL_TRIG = 3 LVL_TRIG = 3
INT_TRIG = 4 INT_TRIG = 4
LVL_FALLING = 0 LVL_FALLING = 0
LVL_RISING = 1 LVL_RISING = 1
ADQ_CLOCK_INT_INTREF = 0 # internal clock source ADQ_CLOCK_INT_INTREF = 0 # internal clock source
ADQ_CLOCK_EXT_REF = 1 # internal clock source, external reference ADQ_CLOCK_EXT_REF = 1 # internal clock source, external reference
ADQ_CLOCK_EXT_CLOCK = 2 # External clock source ADQ_CLOCK_EXT_CLOCK = 2 # External clock source
ADQ_TRANSFER_MODE_NORMAL = 0x00 ADQ_TRANSFER_MODE_NORMAL = 0x00
ADQ_CHANNELS_MASK = 0x3 ADQ_CHANNELS_MASK = 0x3
GHz = 1e9 GHz = 1e9
RMS_TO_VPP = 2 * np.sqrt(2) RMS_TO_VPP = 2 * np.sqrt(2)
class Timer: class Timer:
def __init__(self): def __init__(self):
self.data = [(time.time(), 'start')] self.data = [(time.time(), 'start')]
def __call__(self, text=''): def __call__(self, text=''):
now = time.time() now = time.time()
prev = self.data[-1][0] prev = self.data[-1][0]
self.data.append((now, text)) self.data.append((now, text))
return now - prev return now - prev
def summary(self): def summary(self):
return ' '.join(f'{txt} {tim:.3f}' for tim, txt in self.data[1:]) return ' '.join(f'{txt} {tim:.3f}' for tim, txt in self.data[1:])
def show(self): def show(self):
first = prev = self.data[0][0] first = prev = self.data[0][0]
print('---', first) print('---', first)
for tim, txt in self.data[1:]: for tim, txt in self.data[1:]:
print(f'{(tim - first) * 1000:9.3f} {(tim - prev) * 1000:9.3f} ms {txt}') print(f'{(tim - first) * 1000:9.3f} {(tim - prev) * 1000:9.3f} ms {txt}')
prev = tim prev = tim
class Adq: class Adq:
sample_rate = 2 * GHz sample_rate = 2 * GHz
max_number_of_channels = 2 max_number_of_channels = 2
ndecimate = 50 # decimation ratio (2GHz / 40 MHz) ndecimate = 50 # decimation ratio (2GHz / 40 MHz)
number_of_records = 1 number_of_records = 1
samples_per_record = 16384 samples_per_record = 16384
bw_cutoff = 10E6 bw_cutoff = 10E6
trigger = EXT_TRIG_1 trigger = EXT_TRIG_1
adq_num = 1 adq_num = 1
data = None data = None
busy = False busy = False
def __init__(self): def __init__(self):
global ADQAPI global ADQAPI
ADQAPI = ct.cdll.LoadLibrary("libadq.so.0") ADQAPI = ct.cdll.LoadLibrary("libadq.so.0")
ADQAPI.ADQAPI_GetRevision() ADQAPI.ADQAPI_GetRevision()
# Manually set return type from some ADQAPI functions # Manually set return type from some ADQAPI functions
ADQAPI.CreateADQControlUnit.restype = ct.c_void_p ADQAPI.CreateADQControlUnit.restype = ct.c_void_p
ADQAPI.ADQ_GetRevision.restype = ct.c_void_p ADQAPI.ADQ_GetRevision.restype = ct.c_void_p
ADQAPI.ADQ_GetPtrStream.restype = ct.POINTER(ct.c_int16) ADQAPI.ADQ_GetPtrStream.restype = ct.POINTER(ct.c_int16)
ADQAPI.ADQControlUnit_FindDevices.argtypes = [ct.c_void_p] ADQAPI.ADQControlUnit_FindDevices.argtypes = [ct.c_void_p]
# Create ADQControlUnit # Create ADQControlUnit
self.adq_cu = ct.c_void_p(ADQAPI.CreateADQControlUnit()) self.adq_cu = ct.c_void_p(ADQAPI.CreateADQControlUnit())
ADQAPI.ADQControlUnit_EnableErrorTrace(self.adq_cu, 3, '.') ADQAPI.ADQControlUnit_EnableErrorTrace(self.adq_cu, 3, '.')
# Find ADQ devices # Find ADQ devices
ADQAPI.ADQControlUnit_FindDevices(self.adq_cu) ADQAPI.ADQControlUnit_FindDevices(self.adq_cu)
n_of_adq = ADQAPI.ADQControlUnit_NofADQ(self.adq_cu) n_of_adq = ADQAPI.ADQControlUnit_NofADQ(self.adq_cu)
if n_of_adq != 1: if n_of_adq != 1:
print('number of ADQs must be 1, not %d' % n_of_adq) print('number of ADQs must be 1, not %d' % n_of_adq)
print('it seems the ADQ was not properly closed') print('it seems the ADQ was not properly closed')
raise RuntimeError('please try again or reboot') print('please try again or reboot')
atexit.register(self.deletecu) sys.exit(0)
signal.signal(signal.SIGTERM, lambda *_: sys.exit(0)) atexit.register(self.deletecu)
signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))
rev = ADQAPI.ADQ_GetRevision(self.adq_cu, self.adq_num)
revision = ct.cast(rev, ct.POINTER(ct.c_int)) rev = ADQAPI.ADQ_GetRevision(self.adq_cu, self.adq_num)
out = [f'Connected to ADQ #1, FPGA Revision: {revision[0]}'] revision = ct.cast(rev, ct.POINTER(ct.c_int))
if revision[1]: out = [f'Connected to ADQ #1, FPGA Revision: {revision[0]}']
out.append('Local copy') if revision[1]:
else: out.append('Local copy')
if revision[2]: else:
out.append('SVN Managed - Mixed Revision') if revision[2]:
else: out.append('SVN Managed - Mixed Revision')
out.append('SVN Updated') else:
print(', '.join(out)) out.append('SVN Updated')
ADQAPI.ADQ_SetClockSource(self.adq_cu, self.adq_num, ADQ_CLOCK_EXT_REF) print(', '.join(out))
ADQAPI.ADQ_SetClockSource(self.adq_cu, self.adq_num, ADQ_CLOCK_EXT_REF)
##########################
# Test pattern ##########################
# ADQAPI.ADQ_SetTestPatternMode(self.adq_cu, self.adq_num, 4) # Test pattern
########################## # ADQAPI.ADQ_SetTestPatternMode(self.adq_cu, self.adq_num, 4)
# Sample skip ##########################
# ADQAPI.ADQ_SetSampleSkip(self.adq_cu, self.adq_num, 1) # Sample skip
########################## # ADQAPI.ADQ_SetSampleSkip(self.adq_cu, self.adq_num, 1)
##########################
# set trigger mode
if not ADQAPI.ADQ_SetTriggerMode(self.adq_cu, self.adq_num, self.trigger): # set trigger mode
raise RuntimeError('ADQ_SetTriggerMode failed.') if not ADQAPI.ADQ_SetTriggerMode(self.adq_cu, self.adq_num, self.trigger):
if self.trigger == LVL_TRIG: raise RuntimeError('ADQ_SetTriggerMode failed.')
if not ADQAPI.ADQ_SetLvlTrigLevel(self.adq_cu, self.adq_num, -100): if self.trigger == LVL_TRIG:
raise RuntimeError('ADQ_SetLvlTrigLevel failed.') if not ADQAPI.ADQ_SetLvlTrigLevel(self.adq_cu, self.adq_num, -100):
if not ADQAPI.ADQ_SetTrigLevelResetValue(self.adq_cu, self.adq_num, 1000): raise RuntimeError('ADQ_SetLvlTrigLevel failed.')
raise RuntimeError('ADQ_SetTrigLevelResetValue failed.') if not ADQAPI.ADQ_SetTrigLevelResetValue(self.adq_cu, self.adq_num, 1000):
if not ADQAPI.ADQ_SetLvlTrigChannel(self.adq_cu, self.adq_num, 1): raise RuntimeError('ADQ_SetTrigLevelResetValue failed.')
raise RuntimeError('ADQ_SetLvlTrigChannel failed.') if not ADQAPI.ADQ_SetLvlTrigChannel(self.adq_cu, self.adq_num, 1):
if not ADQAPI.ADQ_SetLvlTrigEdge(self.adq_cu, self.adq_num, LVL_RISING): raise RuntimeError('ADQ_SetLvlTrigChannel failed.')
raise RuntimeError('ADQ_SetLvlTrigEdge failed.') if not ADQAPI.ADQ_SetLvlTrigEdge(self.adq_cu, self.adq_num, LVL_RISING):
elif self.trigger == EXT_TRIG_1: raise RuntimeError('ADQ_SetLvlTrigEdge failed.')
if not ADQAPI.ADQ_SetExternTrigEdge(self.adq_cu, self.adq_num, 2): elif self.trigger == EXT_TRIG_1:
raise RuntimeError('ADQ_SetLvlTrigEdge failed.') if not ADQAPI.ADQ_SetExternTrigEdge(self.adq_cu, self.adq_num, 2):
# if not ADQAPI.ADQ_SetTriggerThresholdVoltage(self.adq_cu, self.adq_num, trigger, ct.c_double(0.2)): raise RuntimeError('ADQ_SetLvlTrigEdge failed.')
# raise RuntimeError('SetTriggerThresholdVoltage failed.') # if not ADQAPI.ADQ_SetTriggerThresholdVoltage(self.adq_cu, self.adq_num, trigger, ct.c_double(0.2)):
# proabably the folloiwng is wrong. # raise RuntimeError('SetTriggerThresholdVoltage failed.')
# print("CHANNEL:" + str(ct.c_int(ADQAPI.ADQ_GetLvlTrigChannel(self.adq_cu, self.adq_num)))) # proabably the folloiwng is wrong.
# print("CHANNEL:" + str(ct.c_int(ADQAPI.ADQ_GetLvlTrigChannel(self.adq_cu, self.adq_num))))
def init(self, samples_per_record=None, number_of_records=None):
"""initialize dimensions and store result object""" def init(self, samples_per_record=None, number_of_records=None):
if samples_per_record: """initialize dimensions and store result object"""
self.samples_per_record = samples_per_record if samples_per_record:
if number_of_records: self.samples_per_record = samples_per_record
self.number_of_records = number_of_records if number_of_records:
# Setup target buffers for data self.number_of_records = number_of_records
self.target_buffers = (ct.POINTER(ct.c_int16 * self.samples_per_record * self.number_of_records) # Setup target buffers for data
* self.max_number_of_channels)() self.target_buffers = (ct.POINTER(ct.c_int16 * self.samples_per_record * self.number_of_records)
for bufp in self.target_buffers: * self.max_number_of_channels)()
bufp.contents = (ct.c_int16 * self.samples_per_record * self.number_of_records)() for bufp in self.target_buffers:
bufp.contents = (ct.c_int16 * self.samples_per_record * self.number_of_records)()
def deletecu(self):
cu = self.__dict__.pop('adq_cu', None) def deletecu(self):
if cu is None: cu = self.__dict__.pop('adq_cu', None)
return if cu is None:
print('shut down ADQ') return
# Only disarm trigger after data is collected print('shut down ADQ')
ADQAPI.ADQ_DisarmTrigger(cu, self.adq_num) # Only disarm trigger after data is collected
ADQAPI.ADQ_MultiRecordClose(cu, self.adq_num) ADQAPI.ADQ_DisarmTrigger(cu, self.adq_num)
# Delete ADQControlunit ADQAPI.ADQ_MultiRecordClose(cu, self.adq_num)
ADQAPI.DeleteADQControlUnit(cu) # Delete ADQControlunit
print('ADQ closed') ADQAPI.DeleteADQControlUnit(cu)
print('ADQ closed')
def start(self, data):
# Start acquisition def start(self, data):
ADQAPI.ADQ_MultiRecordSetup(self.adq_cu, self.adq_num, # Start acquisition
self.number_of_records, ADQAPI.ADQ_MultiRecordSetup(self.adq_cu, self.adq_num,
self.samples_per_record) self.number_of_records,
self.samples_per_record)
ADQAPI.ADQ_DisarmTrigger(self.adq_cu, self.adq_num)
ADQAPI.ADQ_ArmTrigger(self.adq_cu, self.adq_num) ADQAPI.ADQ_DisarmTrigger(self.adq_cu, self.adq_num)
self.data = data ADQAPI.ADQ_ArmTrigger(self.adq_cu, self.adq_num)
self.data = data
def get_data(self):
"""get new data if available""" def get_data(self):
ready = False """get new data if available"""
data = self.data ready = False
if not data: data = self.data
self.busy = False if not data:
return None # no new data self.busy = False
return None # no new data
if ADQAPI.ADQ_GetAcquiredAll(self.adq_cu, self.adq_num):
ready = True if ADQAPI.ADQ_GetAcquiredAll(self.adq_cu, self.adq_num):
data.timer('ready') ready = True
else: data.timer('ready')
if self.trigger == SW_TRIG: else:
ADQAPI.ADQ_SWTrig(self.adq_cu, self.adq_num) if self.trigger == SW_TRIG:
if not ready: ADQAPI.ADQ_SWTrig(self.adq_cu, self.adq_num)
self.busy = True if not ready:
return None self.busy = True
self.data = None return None
t = time.time() self.data = None
# Get data from ADQ t = time.time()
if not ADQAPI.ADQ_GetData( # Get data from ADQ
self.adq_cu, self.adq_num, self.target_buffers, if not ADQAPI.ADQ_GetData(
self.samples_per_record * self.number_of_records, 2, self.adq_cu, self.adq_num, self.target_buffers,
0, self.number_of_records, ADQ_CHANNELS_MASK, self.samples_per_record * self.number_of_records, 2,
0, self.samples_per_record, ADQ_TRANSFER_MODE_NORMAL): 0, self.number_of_records, ADQ_CHANNELS_MASK,
raise RuntimeError('no success from ADQ_GetDATA') 0, self.samples_per_record, ADQ_TRANSFER_MODE_NORMAL):
data.retrieve(self) raise RuntimeError('no success from ADQ_GetDATA')
return data data.retrieve(self)
return data
class PEdata:
def __init__(self, adq): class PEdata:
self.sample_rate = adq.sample_rate def __init__(self, adq):
self.samp_freq = self.sample_rate / GHz self.sample_rate = adq.sample_rate
self.number_of_records = adq.number_of_records self.samp_freq = self.sample_rate / GHz
self.timer = Timer() self.number_of_records = adq.number_of_records
self.timer = Timer()
def retrieve(self, adq):
data = [] def retrieve(self, adq):
rawsignal = [] data = []
for ch in range(2): rawsignal = []
onedim = np.frombuffer(adq.target_buffers[ch].contents, dtype=np.int16) for ch in range(2):
rawsignal.append(onedim[:adq.samples_per_record]) onedim = np.frombuffer(adq.target_buffers[ch].contents, dtype=np.int16)
# convert 16 bit int to a value in the range -1 .. 1 rawsignal.append(onedim[:adq.samples_per_record])
data.append(onedim.reshape(adq.number_of_records, adq.samples_per_record) / float(2 ** 15)) # convert 16 bit int to a value in the range -1 .. 1
# Now this is an array with all records, but the time is artificial data.append(onedim.reshape(adq.number_of_records, adq.samples_per_record) / float(2 ** 15))
self.data = data # Now this is an array with all records, but the time is artificial
self.rawsignal = rawsignal self.data = data
self.timer('retrieved') self.rawsignal = rawsignal
self.timer('retrieved')
def sinW(self, sig, freq, ti, tf):
# sig: signal array def sinW(self, sig, freq, ti, tf):
# freq # sig: signal array
# ti, tf: initial and end time # freq
si = int(ti * self.samp_freq) # ti, tf: initial and end time
nperiods = freq * (tf - ti) si = int(ti * self.samp_freq)
n = int(round(max(2, int(nperiods)) / nperiods * (tf-ti) * self.samp_freq)) nperiods = freq * (tf - ti)
self.nperiods = n n = int(round(max(2, int(nperiods)) / nperiods * (tf-ti) * self.samp_freq))
t = np.arange(si, len(sig)) / self.samp_freq self.nperiods = n
t = t[:n] t = np.arange(si, len(sig)) / self.samp_freq
self.pulselen = n / self.samp_freq t = t[:n]
sig = sig[si:si+n] self.pulselen = n / self.samp_freq
a = 2*np.sum(sig*np.cos(2*np.pi*freq*t))/len(sig) sig = sig[si:si+n]
b = 2*np.sum(sig*np.sin(2*np.pi*freq*t))/len(sig) a = 2*np.sum(sig*np.cos(2*np.pi*freq*t))/len(sig)
return a, b b = 2*np.sum(sig*np.sin(2*np.pi*freq*t))/len(sig)
return a, b
def mix(self, sigin, sigout, freq, ti, tf):
# sigin, sigout: signal array, incomping, output def mix(self, sigin, sigout, freq, ti, tf):
# freq # sigin, sigout: signal array, incomping, output
# ti, tf: initial and end time of sigin # freq
a, b = self.sinW(sigin, freq, ti, tf) # ti, tf: initial and end time of sigin
amp = np.sqrt(a**2 + b**2) a, b = self.sinW(sigin, freq, ti, tf)
a, b = a/amp, b/amp amp = np.sqrt(a**2 + b**2)
# si = int(ti * self.samp_freq) a, b = a/amp, b/amp
t = np.arange(len(sigout)) / self.samp_freq # si = int(ti * self.samp_freq)
wave1 = sigout * (a * np.cos(2*np.pi*freq*t) + b * np.sin(2*np.pi*freq*t)) t = np.arange(len(sigout)) / self.samp_freq
wave2 = sigout * (a * np.sin(2*np.pi*freq*t) - b * np.cos(2*np.pi*freq*t)) wave1 = sigout * (a * np.cos(2*np.pi*freq*t) + b * np.sin(2*np.pi*freq*t))
return wave1, wave2 wave2 = sigout * (a * np.sin(2*np.pi*freq*t) - b * np.cos(2*np.pi*freq*t))
return wave1, wave2
def averageiq(self, data, freq, ti, tf):
"""Average over records""" def averageiq(self, data, freq, ti, tf):
iorq = np.array([self.mix(data[0][i], data[1][i], freq, ti, tf) for i in range(self.number_of_records)]) """Average over records"""
return iorq.sum(axis=0) / self.number_of_records iorq = np.array([self.mix(data[0][i], data[1][i], freq, ti, tf) for i in range(self.number_of_records)])
return iorq.sum(axis=0) / self.number_of_records
def filtro(self, iorq, cutoff):
# butter lowpass def filtro(self, iorq, cutoff):
nyq = 0.5 * self.sample_rate # butter lowpass
normal_cutoff = cutoff / nyq nyq = 0.5 * self.sample_rate
order = 5 normal_cutoff = cutoff / nyq
b, a = butter(order, normal_cutoff, btype='low', analog=False) order = 5
iqf = [filtfilt(b, a, iorq[i]) for i in np.arange(len(iorq))] b, a = butter(order, normal_cutoff, btype='low', analog=False)
return iqf iqf = [filtfilt(b, a, iorq[i]) for i in np.arange(len(iorq))]
return iqf
def box(self, iorq, ti, tf):
si = int(self.samp_freq * ti) def box(self, iorq, ti, tf):
sf = int(self.samp_freq * tf) si = int(self.samp_freq * ti)
bxa = [sum(iorq[i][si:sf])/(sf-si) for i in np.arange(len(iorq))] sf = int(self.samp_freq * tf)
return bxa bxa = [sum(iorq[i][si:sf])/(sf-si) for i in np.arange(len(iorq))]
return bxa
def gates_and_curves(self, freq, pulse, roi, bw_cutoff):
"""return iq values of rois and prepare plottable curves for iq""" def gates_and_curves(self, freq, pulse, roi, bw_cutoff):
self.timer('gates') """return iq values of rois and prepare plottable curves for iq"""
try: self.timer('gates')
self.ndecimate = int(round(self.sample_rate / freq)) try:
except TypeError as e: self.ndecimate = int(round(self.sample_rate / freq))
raise TypeError(f'{self.sample_rate}/{freq} {e}') except TypeError as e:
iq = self.averageiq(self.data, freq / GHz, *pulse) raise TypeError(f'{self.sample_rate}/{freq} {e}')
self.timer('aviq') iq = self.averageiq(self.data, freq / GHz, *pulse)
iqf = self.filtro(iq, bw_cutoff) self.timer('aviq')
self.timer('filtro') iqf = self.filtro(iq, bw_cutoff)
m = max(1, len(iqf[0]) // self.ndecimate) self.timer('filtro')
ll = m * self.ndecimate m = max(1, len(iqf[0]) // self.ndecimate)
iqf = [iqfx[0:ll] for iqfx in iqf] ll = m * self.ndecimate
self.timer('iqf') iqf = [iqfx[0:ll] for iqfx in iqf]
iqd = np.average(np.resize(iqf, (2, m, self.ndecimate)), axis=2) self.timer('iqf')
self.timer('avg') iqd = np.average(np.resize(iqf, (2, m, self.ndecimate)), axis=2)
t_axis = np.arange(m) * self.ndecimate / self.samp_freq self.timer('avg')
pulsig = np.abs(self.data[0][0]) t_axis = np.arange(m) * self.ndecimate / self.samp_freq
self.timer('pulsig') pulsig = np.abs(self.data[0][0])
pulsig = np.average(np.resize(pulsig, (m, self.ndecimate)), axis=1) self.timer('pulsig')
result = ([self.box(iqf, *r) for r in roi], # gates pulsig = np.average(np.resize(pulsig, (m, self.ndecimate)), axis=1)
(t_axis, iqd[0], iqd[1], pulsig)) # curves result = ([self.box(iqf, *r) for r in roi], # gates
self.timer('result') (t_axis, iqd[0], iqd[1], pulsig)) # curves
# self.timer.show() self.timer('result')
# ns = len(self.rawsignal[0]) * self.number_of_records # self.timer.show()
# print(f'{ns} {ns / 2e6} ms') # ns = len(self.rawsignal[0]) * self.number_of_records
return result # print(f'{ns} {ns / 2e6} ms')
return result
class Namespace:
"""holds channel or other data""" class Namespace:
def __init__(self, **kwds): """holds channel or other data"""
self.__dict__.update(**kwds) def __init__(self, **kwds):
self.__dict__.update(**kwds)
class RUSdata:
def __init__(self, adq, freq, periods, delay_samples): class RUSdata:
self.sample_rate = adq.sample_rate def __init__(self, adq, freq, periods, delay_samples):
self.freq = freq self.sample_rate = adq.sample_rate
self.periods = periods self.freq = freq
self.delay_samples = delay_samples self.periods = periods
self.samples_per_record = adq.samples_per_record self.delay_samples = delay_samples
self.inp = Namespace(idx=0, name='input') self.samples_per_record = adq.samples_per_record
self.out = Namespace(idx=1, name='output') self.inp = Namespace(idx=0, name='input')
self.channels = (self.inp, self.out) self.out = Namespace(idx=1, name='output')
self.timer = Timer() self.channels = (self.inp, self.out)
self.timer = Timer()
def retrieve(self, adq):
self.timer('start retrieve') def retrieve(self, adq):
npts = self.samples_per_record - self.delay_samples self.timer('start retrieve')
nbin = max(1, npts // (self.periods * 60)) # for performance reasons, do the binning first npts = self.samples_per_record - self.delay_samples
nreduced = npts // nbin nbin = max(1, npts // (self.periods * 60)) # for performance reasons, do the binning first
ft = 2 * np.pi * self.freq * nbin / self.sample_rate * np.arange(nreduced) nreduced = npts // nbin
self.timer('create time axis') ft = 2 * np.pi * self.freq * nbin / self.sample_rate * np.arange(nreduced)
# complex_sinusoid = np.exp(1j * ft) # do not use this, below is 33 % faster self.timer('create time axis')
complex_sinusoid = 1j * np.sin(ft) + np.cos(ft) # complex_sinusoid = np.exp(1j * ft) # do not use this, below is 33 % faster
self.timer('sinusoid') complex_sinusoid = 1j * np.sin(ft) + np.cos(ft)
self.timer('sinusoid')
rawsignal = [] # for raw plot
for chan in self.channels: # looping over input and output rawsignal = [] # for raw plot
# although the ADC is only 14 bit it is represented as unsigend 16 bit numbers, for chan in self.channels: # looping over input and output
# and due to some calculations (calibration) the last 2 bits are not zero # although the ADC is only 14 bit it is represented as unsigend 16 bit numbers,
beg = self.delay_samples # and due to some calculations (calibration) the last 2 bits are not zero
isignal = np.frombuffer(adq.target_buffers[chan.idx].contents, dtype=np.int16)[beg:beg+nreduced * nbin] beg = self.delay_samples
self.timer('isignal') isignal = np.frombuffer(adq.target_buffers[chan.idx].contents, dtype=np.int16)[beg:beg+nreduced * nbin]
reduced = isignal.reshape((-1, nbin)).mean(axis=1) # this converts also int16 to float self.timer('isignal')
self.timer('reduce') reduced = isignal.reshape((-1, nbin)).mean(axis=1) # this converts also int16 to float
rawsignal.append(reduced) self.timer('reduce')
chan.signal = signal = reduced * 2 ** -16 # in V -> peak to peak 1 V ~ +- 0.5 V rawsignal.append(reduced)
self.timer('divide') chan.signal = signal = reduced * 2 ** -16 # in V -> peak to peak 1 V ~ +- 0.5 V
# calculate RMS * sqrt(2) -> peak sinus amplitude. self.timer('divide')
# may be higher than the input range by a factor 1.4 when heavily clipped # calculate RMS * sqrt(2) -> peak sinus amplitude.
chan.amplitude = np.sqrt((signal ** 2).mean()) * RMS_TO_VPP # may be higher than the input range by a factor 1.4 when heavily clipped
self.timer('amp') chan.amplitude = np.sqrt((signal ** 2).mean()) * RMS_TO_VPP
chan.mixed = signal * complex_sinusoid self.timer('amp')
self.timer('mix') chan.mixed = signal * complex_sinusoid
chan.mean = chan.mixed.mean() self.timer('mix')
self.timer('mean') chan.mean = chan.mixed.mean()
self.rawsignal = rawsignal self.timer('mean')
if self.inp.mean: self.rawsignal = rawsignal
self.iq = self.out.mean / self.inp.mean if self.inp.mean:
else: self.iq = self.out.mean / self.inp.mean
self.iq = 0 else:
self.iq = 0
def get_quality(self):
"""get signal quality info def get_quality(self):
"""get signal quality info
quality info (small values indicate good quality):
- input_stddev: quality info (small values indicate good quality):
the imaginary part indicates deviations in phase - input_stddev:
the real part indicates deviations in amplitude the imaginary part indicates deviations in phase
- output_slope: the real part indicates deviations in amplitude
the imaginary part indicates a turning phase (rad/sec) - output_slope:
the real part indicates changes in amplitude (0.01 ~= 1%/sec) the imaginary part indicates a turning phase (rad/sec)
""" the real part indicates changes in amplitude (0.01 ~= 1%/sec)
self.timer('get_quality') """
npts = len(self.channels[0].signal) self.timer('get_quality')
nper = npts // self.periods npts = len(self.channels[0].signal)
for chan in self.channels: nper = npts // self.periods
mean = chan.mixed.mean() for chan in self.channels:
chan.reduced = chan.mixed[:self.periods * nper].reshape((-1, nper)).mean(axis=1) / mean mean = chan.mixed.mean()
chan.reduced = chan.mixed[:self.periods * nper].reshape((-1, nper)).mean(axis=1) / mean
timeaxis = np.arange(len(self.out.reduced)) * self.sample_rate / self.freq
result = Namespace( timeaxis = np.arange(len(self.out.reduced)) * self.sample_rate / self.freq
input_stddev=self.inp.reduced.std(), result = Namespace(
output_slope=np.polyfit(timeaxis, self.out.reduced, 1)[0]) input_stddev=self.inp.reduced.std(),
self.timer('got_quality') output_slope=np.polyfit(timeaxis, self.out.reduced, 1)[0])
self.timer.show() self.timer('got_quality')
ns = len(self.rawsignal[0]) self.timer.show()
print(f'{ns} {ns / 2e6} ms') ns = len(self.rawsignal[0])
return result print(f'{ns} {ns / 2e6} ms')
return result