frappy/frappy_psi/adq_mr.py
Markus Zolliker f8c52af3ac frappy_psi.ultrasound: after rework (still wip)
Change-Id: I200cbeca2dd0f030a01a78ba4d38c342c3c8c8e3
2025-03-17 09:37:13 +01:00

332 lines
13 KiB
Python

# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Damaris Tartarotti Maimone
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""Wrapper for the ADQ data acquisition card for ultrasound"""
import sys
import atexit
import signal
import time
import numpy as np
import ctypes as ct
from scipy.signal import butter, filtfilt
# For different trigger modes
SW_TRIG = 1
# The following external trigger does not work if the level of the trigger is very close to 0.5V.
# Now we have it close to 3V, and it works
EXT_TRIG_1 = 2
EXT_TRIG_2 = 7
EXT_TRIG_3 = 8
LVL_TRIG = 3
INT_TRIG = 4
LVL_FALLING = 0
LVL_RISING = 1
ADQ_CLOCK_INT_INTREF = 0 # internal clock source
ADQ_CLOCK_EXT_REF = 1 # internal clock source, external reference
ADQ_CLOCK_EXT_CLOCK = 2 # External clock source
ADQ_TRANSFER_MODE_NORMAL = 0x00
ADQ_CHANNELS_MASK = 0x3
GHz = 1e9
class Adq:
sample_rate = 2 * GHz
max_number_of_channels = 2
ndecimate = 50 # decimation ratio (2GHz / 40 MHz)
number_of_records = 1
samples_per_record = 16384
bw_cutoff = 10E6
trigger = EXT_TRIG_1
adq_num = 1
data = None
busy = False
def __init__(self):
global ADQAPI
ADQAPI = ct.cdll.LoadLibrary("libadq.so.0")
ADQAPI.ADQAPI_GetRevision()
# Manually set return type from some ADQAPI functions
ADQAPI.CreateADQControlUnit.restype = ct.c_void_p
ADQAPI.ADQ_GetRevision.restype = ct.c_void_p
ADQAPI.ADQ_GetPtrStream.restype = ct.POINTER(ct.c_int16)
ADQAPI.ADQControlUnit_FindDevices.argtypes = [ct.c_void_p]
# Create ADQControlUnit
self.adq_cu = ct.c_void_p(ADQAPI.CreateADQControlUnit())
ADQAPI.ADQControlUnit_EnableErrorTrace(self.adq_cu, 3, '.')
# Find ADQ devices
ADQAPI.ADQControlUnit_FindDevices(self.adq_cu)
n_of_adq = ADQAPI.ADQControlUnit_NofADQ(self.adq_cu)
if n_of_adq != 1:
print('number of ADQs must be 1, not %d' % n_of_adq)
print('it seems the ADQ was not properly closed')
raise RuntimeError('please try again or reboot')
atexit.register(self.deletecu)
signal.signal(signal.SIGTERM, lambda *_: sys.exit(0))
rev = ADQAPI.ADQ_GetRevision(self.adq_cu, self.adq_num)
revision = ct.cast(rev, ct.POINTER(ct.c_int))
print('\nConnected to ADQ #1')
# Print revision information
print('FPGA Revision: {}'.format(revision[0]))
if revision[1]:
print('Local copy')
else:
print('SVN Managed')
if revision[2]:
print('Mixed Revision')
else:
print('SVN Updated')
print('')
ADQAPI.ADQ_SetClockSource(self.adq_cu, self.adq_num, ADQ_CLOCK_EXT_REF)
##########################
# Test pattern
# ADQAPI.ADQ_SetTestPatternMode(self.adq_cu, self.adq_num, 4)
##########################
# Sample skip
# ADQAPI.ADQ_SetSampleSkip(self.adq_cu, self.adq_num, 1)
##########################
# set trigger mode
if not ADQAPI.ADQ_SetTriggerMode(self.adq_cu, self.adq_num, self.trigger):
raise RuntimeError('ADQ_SetTriggerMode failed.')
if self.trigger == LVL_TRIG:
if not ADQAPI.ADQ_SetLvlTrigLevel(self.adq_cu, self.adq_num, -100):
raise RuntimeError('ADQ_SetLvlTrigLevel failed.')
if not ADQAPI.ADQ_SetTrigLevelResetValue(self.adq_cu, self.adq_num, 1000):
raise RuntimeError('ADQ_SetTrigLevelResetValue failed.')
if not ADQAPI.ADQ_SetLvlTrigChannel(self.adq_cu, self.adq_num, 1):
raise RuntimeError('ADQ_SetLvlTrigChannel failed.')
if not ADQAPI.ADQ_SetLvlTrigEdge(self.adq_cu, self.adq_num, LVL_RISING):
raise RuntimeError('ADQ_SetLvlTrigEdge failed.')
elif self.trigger == EXT_TRIG_1:
if not ADQAPI.ADQ_SetExternTrigEdge(self.adq_cu, self.adq_num, 2):
raise RuntimeError('ADQ_SetLvlTrigEdge failed.')
# if not ADQAPI.ADQ_SetTriggerThresholdVoltage(self.adq_cu, self.adq_num, trigger, ct.c_double(0.2)):
# raise RuntimeError('SetTriggerThresholdVoltage failed.')
print("CHANNEL:"+str(ct.c_int(ADQAPI.ADQ_GetLvlTrigChannel(self.adq_cu, self.adq_num))))
def init(self, samples_per_record=None, number_of_records=None):
"""initialize dimensions and store result object"""
if samples_per_record:
self.samples_per_record = samples_per_record
if number_of_records:
self.number_of_records = number_of_records
# Setup target buffers for data
self.target_buffers = (ct.POINTER(ct.c_int16 * self.samples_per_record * self.number_of_records)
* self.max_number_of_channels)()
for bufp in self.target_buffers:
bufp.contents = (ct.c_int16 * self.samples_per_record * self.number_of_records)()
def deletecu(self):
cu = self.__dict__.pop('adq_cu', None)
if cu is None:
print('deletecu already called')
return
print('deletecu called')
# Only disarm trigger after data is collected
ADQAPI.ADQ_DisarmTrigger(cu, self.adq_num)
ADQAPI.ADQ_MultiRecordClose(cu, self.adq_num)
print('delete cu')
# Delete ADQControlunit
ADQAPI.DeleteADQControlUnit(cu)
print('cu deleted')
def start(self, data):
# Start acquisition
ADQAPI.ADQ_MultiRecordSetup(self.adq_cu, self.adq_num,
self.number_of_records,
self.samples_per_record)
ADQAPI.ADQ_DisarmTrigger(self.adq_cu, self.adq_num)
ADQAPI.ADQ_ArmTrigger(self.adq_cu, self.adq_num)
self.data = data
def get_data(self):
"""get new data if available"""
ready = False
data = self.data
if not data:
self.busy = False
return None # no new data
if ADQAPI.ADQ_GetAcquiredAll(self.adq_cu, self.adq_num):
ready = True
else:
if self.trigger == SW_TRIG:
ADQAPI.ADQ_SWTrig(self.adq_cu, self.adq_num)
if not ready:
self.busy = True
return None
# Get data from ADQ
if not ADQAPI.ADQ_GetData(
self.adq_cu, self.adq_num, self.target_buffers,
self.samples_per_record * self.number_of_records, 2,
0, self.number_of_records, ADQ_CHANNELS_MASK,
0, self.samples_per_record, ADQ_TRANSFER_MODE_NORMAL):
raise RuntimeError('no success from ADQ_GetDATA')
self.data = None
data.retrieve(self)
return data
class PEdata:
def __init__(self, adq):
self.sample_rate = adq.sample_rate
self.samp_freq = self.sample_rate / GHz
self.number_of_records = adq.number_of_records
data = []
for ch in range(2):
onedim = np.frombuffer(adq.target_buffers[ch].contents, dtype=np.int16)
data.append(onedim.reshape(adq.number_of_records, adq.samples_per_record) / float(2**14)) # 14 bits ADC
# Now this is an array with all records, but the time is artificial
self.data = data
def sinW(self, sig, freq, ti, tf):
# sig: signal array
# freq
# ti, tf: initial and end time
si = int(ti * self.samp_freq)
nperiods = freq * (tf - ti)
n = int(round(max(2, int(nperiods)) / nperiods * (tf-ti) * self.samp_freq))
self.nperiods = n
t = np.arange(si, len(sig)) / self.samp_freq
t = t[:n]
self.pulselen = n / self.samp_freq
sig = sig[si:si+n]
a = 2*np.sum(sig*np.cos(2*np.pi*freq*t))/len(sig)
b = 2*np.sum(sig*np.sin(2*np.pi*freq*t))/len(sig)
return a, b
def mix(self, sigin, sigout, freq, ti, tf):
# sigin, sigout: signal array, incomping, output
# freq
# ti, tf: initial and end time of sigin
a, b = self.sinW(sigin, freq, ti, tf)
amp = np.sqrt(a**2 + b**2)
a, b = a/amp, b/amp
# si = int(ti * self.samp_freq)
t = np.arange(len(sigout)) / self.samp_freq
wave1 = sigout * (a * np.cos(2*np.pi*freq*t) + b * np.sin(2*np.pi*freq*t))
wave2 = sigout * (a * np.sin(2*np.pi*freq*t) - b * np.cos(2*np.pi*freq*t))
return wave1, wave2
def averageiq(self, data, freq, ti, tf):
"""Average over records"""
iorq = np.array([self.mix(data[0][i], data[1][i], freq, ti, tf) for i in range(self.number_of_records)])
return iorq.sum(axis=0) / self.number_of_records
def filtro(self, iorq, cutoff):
# butter lowpass
nyq = 0.5 * self.sample_rate
normal_cutoff = cutoff / nyq
order = 5
b, a = butter(order, normal_cutoff, btype='low', analog=False)
iqf = [filtfilt(b, a, iorq[i]) for i in np.arange(len(iorq))]
return iqf
def box(self, iorq, ti, tf):
si = int(self.samp_freq * ti)
sf = int(self.samp_freq * tf)
bxa = [sum(iorq[i][si:sf])/(sf-si) for i in np.arange(len(iorq))]
return bxa
def gates_and_curves(self, freq, pulse, roi, bw_cutoff):
"""return iq values of rois and prepare plottable curves for iq"""
self.ndecimate = int(round(self.sample_rate / freq))
# times = []
# times.append(('aviq', time.time()))
iq = self.averageiq(self.data, freq / GHz, *pulse)
# times.append(('filtro', time.time()))
iqf = self.filtro(iq, bw_cutoff)
m = len(iqf[0]) // self.ndecimate
ll = m * self.ndecimate
iqf = [iqfx[0:ll] for iqfx in iqf]
# times.append(('iqdec', time.time()))
iqd = np.average(np.resize(iqf, (2, m, self.ndecimate)), axis=2)
t_axis = np.arange(m) * self.ndecimate / self.samp_freq
pulsig = np.abs(self.data[0][0])
# times.append(('pulsig', time.time()))
pulsig = np.average(np.resize(pulsig, (m, self.ndecimate)), axis=1)
self.curves = (t_axis, iqd[0], iqd[1], pulsig)
# print(times)
return [self.box(iqf, *r) for r in roi]
class Namespace:
"""holds channel or other data"""
def __init__(self, **kwds):
self.__dict__.update(**kwds)
class RUSdata:
def __init__(self, adq, freq, periods):
self.sample_rate = adq.sample_rate
self.freq = freq
self.periods = periods
self.samples_per_record = adq.samples_per_record
self.inp = Namespace(idx=0, name='input')
self.out = Namespace(idx=1, name='output')
self.channels = (self.inp, self.out)
def retrieve(self, adq):
npts = self.samples_per_record
complex_sinusoid = np.exp(1j * 2 * np.pi * self.freq / self.sample_rate * np.arange(npts))
for chan in self.channels: # looping over input and output
signal = np.frombuffer(adq.target_buffers[chan.idx].contents, dtype=np.int16)
chan.ovr_rate = (np.abs(signal) > 8000).sum() / npts # fraction of points outside range
# calculate RMS * sqrt(2) -> peak sinus amplitude.
# may be higher than the input range by a factor 1.4 when heavily clipped
# we need to convert signal from int16 to float -> use 2.0 as exponent does the trick
chan.amplitude = np.sqrt((signal ** 2.0).mean())
chan.mixed = signal * complex_sinusoid
chan.mean = chan.mixed.mean()
if self.inp.mean:
self.iq = self.out.mean / self.inp.mean
else:
self.iq = 0
def get_quality(self):
"""get signal quality info
quality info (small values indicate good quality):
- input_stddev:
the imaginary part indicates deviations in phase
the real part indicates deviations in amplitude
- output_slope:
the imaginary part indicates a turning phase (rad/sec)
the real part indicates changes in amplitude (0.01 ~= 1%/sec)
"""
nper = self.samples_per_record // self.periods
for chan in self.channels:
mean = chan.mixed.mean()
chan.reduced = chan.mixed[:self.periods * nper].reshape((-1, nper)).mean(axis=0) / mean
timeaxis = np.arange(len(self.out.reduced)) * self.sample_rate / self.freq
return Namespace(
input_stddev = self.inp.reduced.std(),
output_slope = np.polyfit(timeaxis, self.out.reduced, 1)[0])