frappy/frappy_psi/ultrasound.py
Ultrasound PC d9f340dce6 ultrasound: change control roi0 to a Readable (2)
+ remove cfg/PEUS.py
+ fix equipment_id of PEUS
+ add header to frappy_psi.iqplot
2025-03-26 16:45:53 +01:00

499 lines
18 KiB
Python

# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""frappy support for ultrasound"""
import math
import os
import time
import numpy as np
from frappy_psi.adq_mr import Adq, PEdata, RUSdata
from frappy.core import Attached, BoolType, Done, FloatRange, HasIO, StatusType, \
IntRange, Module, Parameter, Readable, Writable, StatusType, StringIO, StringType, \
IDLE, BUSY, DISABLED, WARN, ERROR, TupleOf, ArrayOf, Command, Attached, EnumType ,\
Drivable
from frappy.properties import Property
from frappy.lib import clamp
# from frappy.modules import Collector
Collector = Readable
def fname_from_time(t, extension):
tm = time.localtime(t)
dirname = os.path.join('..', 'data', time.strftime("%Y-%m-%d_%H", tm))
filename = time.strftime("%Y-%m-%d_%H-%M-%S_", tm)
filename = filename + ("%.1f" % t)[-1]
if not os.path.isdir(dirname):
os.makedirs(dirname)
return os.path.join(dirname, filename)
class Roi(Readable):
main = Attached()
a = Attached(mandatory=False) # amplitude Readable
p = Attached(mandatory=False) # phase Readable
i = Attached(mandatory=False) # i Readable
q = Attached(mandatory=False) # amplitude Readable
value = Parameter('i, q', TupleOf(FloatRange(), FloatRange()), default=(0, 0))
time = Parameter('start time', FloatRange(unit='nsec'), readonly=False)
size = Parameter('interval (symmetric around time)', FloatRange(unit='nsec'), readonly=False)
enable = Parameter('calculate this roi', BoolType(), readonly=False, default=True)
pollinterval = Parameter(export=False)
interval = (0, 0)
def initModule(self):
super().initModule()
self.main.register_roi(self)
self.calc_interval()
def calc_interval(self):
self.interval = (self.time - 0.5 * self.size, self.time + 0.5 * self.size)
def read_status(self):
return (IDLE, '') if self.enable else (DISABLED, 'disabled')
def write_time(self, value):
self.time = value
self.calc_interval()
return Done
def write_size(self, value):
self.size = value
self.calc_interval()
return Done
class ControlRoi(Roi, Readable):
freq = Attached()
maxstep = Parameter('max frequency step', FloatRange(unit='Hz'), readonly=False,
default=10000)
minstep = Parameter('min frequency step for slope calculation', FloatRange(unit='Hz'),
readonly=False, default=4000)
slope = Parameter('inphase/frequency slope', FloatRange(), readonly=False,
default=1e6)
control_active = Parameter('are we controlling?', BoolType(), readonly=False, default=0)
_freq_target = None
_skipctrl = 2
_old = None
def doPoll(self):
inphase = self.value[0]
freq = self.freq.target
if freq != self._freq_target:
self._freq_target = freq
# do no control 2 times after changing frequency
self._skipctrl = 2
if self.control_active:
if self._old:
newfreq = freq + inphase * self.slope
fdif = freq - self._old[0]
if abs(fdif) >= self.minstep:
idif = inphase - self._old[1]
self.slope = - fdif / idif
else:
# do a 'test' step
newfreq = freq + self.minstep
self.old = (freq, inphase)
if self._skipctrl > 0: # do no control for some time after changing frequency
self._skipctrl -= 1
elif self.control_active:
self._freq_target = self.freq.write_target(clamp(freq - self.maxstep, newfreq, freq + self.maxstep))
class Pars(Module):
description = 'relevant parameters from SEA'
timestamp = Parameter('unix timestamp', StringType(), default='0', readonly=False)
temperature = Parameter('T', FloatRange(unit='K'), default=0, readonly=False)
mf = Parameter('field', FloatRange(unit='T'), default=0, readonly=False)
sr = Parameter('rotation angle', FloatRange(unit='deg'), default=0, readonly=False)
class FreqStringIO(StringIO):
end_of_line = '\r\n'
class Frequency(HasIO, Drivable):
value = Parameter('frequency', unit='Hz')
amp = Parameter('amplitude (VPP)', FloatRange(unit='V'), readonly=False)
output = Parameter('output: L or R', EnumType(L=1, R=0), readonly=False, default='L')
last_change = 0
ioClass = FreqStringIO
dif = None
_started = 0
_within_write_target = False
_nopoll_until = 0
def doPoll(self):
super().doPoll()
if self.isBusy() and time.time() > self._started + 5:
self.status = WARN, 'acquisition timeout'
def register_dif(self, dif):
self.dif = dif
def read_value(self):
if time.time() > self._nopoll_until or self.value == 0:
self.value = float(self.communicate('FREQ?'))
if self.dif:
self.dif.read_value()
return self.value
def set_busy(self):
"""called by an acquisition module
from a callback on value within write_target
"""
if self._within_write_target:
self._started = time.time()
self.log.info('set busy')
self.status = BUSY, 'waiting for acquisition'
def set_idle(self):
if self.isBusy():
self.status = IDLE, ''
def write_target(self, value):
self._nopoll_until = time.time() + 10
try:
self._within_write_target = True
# may trigger busy=True from an acquisition module
self.value = float(self.communicate('FREQ %.15g;FREQ?' % value))
self.last_change = time.time()
if self.dif:
self.dif.read_value()
return self.value
finally:
self._within_write_target = False
def write_amp(self, amp):
self._nopoll_until = time.time() + 10
self.amp = float(self.communicate(f'AMP{self.output.name} {amp} VPP;AMP{self.output.name}? VPP'))
return self.amp
def read_amp(self):
if time.time() > self._nopoll_until or self.amp == 0:
return float(self.communicate(f'AMP{self.output.name}? VPP'))
return self.amp
class FrequencyDif(Drivable):
freq = Attached(Frequency)
base = Parameter('base frequency', FloatRange(unit='Hz'), default=0)
value = Parameter('difference to base frequency', FloatRange(unit='Hz'), default=0)
def initModule(self):
super().initModule()
self.freq.register_dif(self)
def write_value(self, target):
self.freq.write_target(target + self.base)
return self.value # this was updated in Frequency
def read_value(self):
return self.freq.value - self.base
def read_status(self):
return self.freq.read_status()
class Base:
freq = Attached()
sr = Parameter('samples per record', datatype=IntRange(1, 1E9), default=16384)
adq = None
_rawsignal = None
_fast_poll = 0.001
def shutdownModule(self):
if self.adq:
self.adq.deletecu()
self.adq = None
@Command(argument=TupleOf(FloatRange(unit='ns'), FloatRange(unit='ns'), IntRange(0,99999)),
result=TupleOf(FloatRange(),
ArrayOf(ArrayOf(IntRange(-0x7fff, 0x7fff), 0, 99999))))
def get_signal(self, start, end, npoints):
"""get signal
:param start: start time (ns)
:param end: end time (ns)
:param npoints: hint for number of data points
:return: (<time-step>, array of array of y)
for performance reasons the result data is rounded to int16
"""
# convert ns to samples
sr = self.adq.sample_rate * 1e-9
istart = round(start * sr)
iend = min(self.sr, round(end * sr))
nbin = max(1, round((iend - istart) / npoints))
iend = iend // nbin * nbin
return (nbin / sr,
[np.round(ch[istart:iend].reshape((-1, nbin)).mean(axis=1)) for ch in self._rawsignal])
class PulseEcho(Base, Readable):
value = Parameter(default=0)
nr = Parameter('number of records', datatype=IntRange(1, 9999), default=500)
bw = Parameter('bandwidth lowpassfilter', datatype=FloatRange(unit='Hz'), default=10E6)
time = Parameter('pulse start time', FloatRange(unit='nsec'),
readonly=False)
size = Parameter('pulse length (starting from time)', FloatRange(unit='nsec'),
readonly=False)
pulselen = Parameter('adjusted pulse length (integer number of periods)', FloatRange(unit='nsec'), default=1)
# curves = Attached(mandatory=False)
pollinterval = Parameter('poll interval', datatype=FloatRange(0,120))
_starttime = None
def initModule(self):
super().initModule()
self.adq = Adq()
self.adq.init(self.sr, self.nr)
self.roilist = []
self.setFastPoll(True, self._fast_poll)
def doPoll(self):
try:
data = self.adq.get_data()
except Exception as e:
self.status = ERROR, repr(e)
return
if data is None:
if self.adq.busy:
return
self.adq.start(PEdata(self.adq))
self.setFastPoll(True, self._fast_poll)
return
roilist = [r for r in self.roilist if r.enable]
freq = self.freq.read_value()
if not freq:
self.log.info('freq=0')
return
gates, curves = data.gates_and_curves(
freq, (self.time, self.time + self.size),
[(r.time, r.time + r.size) for r in roilist], self.bw)
for i, roi in enumerate(roilist):
a = gates[i][0]
b = gates[i][1]
roi.value = a, b
if roi.i:
roi.i.value = a
if roi.q:
roi.q.value = b
if roi.a:
roi.a.value = math.sqrt(a ** 2 + b ** 2)
if roi.p:
roi.p.value = math.atan2(a, b) * 180 / math.pi
self._curves = curves
self._rawsignal = data.rawsignal
@Command(result=TupleOf(*[ArrayOf(FloatRange(), 0, 99999)
for _ in range(4)]))
def get_curves(self):
"""retrieve curves"""
return self._curves
def write_nr(self, value):
self.adq.init(self.sr, value)
def write_sr(self, value):
self.adq.init(value, self.nr)
def write_bw(self, value):
self.adq.bw_cutoff = value
def register_roi(self, roi):
self.roilist.append(roi)
CONTINUE = 0
GO = 1
DONE_GO = 2
WAIT_GO = 3
class RUS(Base, Collector):
freq = Attached()
imod = Attached(mandatory=False)
qmod = Attached(mandatory=False)
value = Parameter('averaged (I, Q) tuple', TupleOf(FloatRange(), FloatRange()))
status = Parameter(datatype=StatusType(Readable, 'BUSY'))
periods = Parameter('number of periods', IntRange(1, 999999), default=12, readonly=False)
input_delay = Parameter('throw away everything before this time',
FloatRange(unit='ns'), default=10000, readonly=False)
input_range = Parameter('input range (taking into account attenuation)', FloatRange(unit='V'),
default=10, readonly=False)
output_range = Parameter('output range', FloatRange(unit='V'),
default=1, readonly=False)
input_amplitude = Parameter('input signal amplitude', FloatRange(unit='V'), default=0)
output_amplitude = Parameter('output signal amplitude', FloatRange(unit='V'), default=0)
phase = Parameter('phase', FloatRange(unit='deg'), default=0)
amp = Parameter('amplitude', FloatRange(), default=0)
continuous = Parameter('continuous mode', BoolType(), readonly=False, default=True)
pollinterval = Parameter(datatype=FloatRange(0, 120), default=5)
_starttime = None
_iq = 0
_wait_until = 0 # deadline for returning to continuous mode
_action = CONTINUE # one of CONTINUE, GO, DONE_GO, WAIT_GO
_status = IDLE, 'no data yet'
_busy = False # waiting for end of aquisition (not the same as self.status[0] == BUSY)
_requested_freq = None
def initModule(self):
super().initModule()
self.adq = Adq()
self.freq.addCallback('value', self.update_freq)
self.freq.addCallback('target', self.update_freq_target)
# self.write_periods(self.periods)
def update_freq_target(self, value):
self.go()
def update_freq(self, value):
self.setFastPoll(True, self._fast_poll)
self._requested_freq = value
self.freq.set_busy() # is only effective when the update was trigger within freq.write_target
def get_quality_info(self, data):
"""hook for RESqual"""
data.timer.show()
def doPoll(self):
try:
data = self.adq.get_data()
except Exception as e:
self.set_status(ERROR, repr(e))
self._busy = False
self._action = WAIT_GO
self.wait_until = time.time() + 2
return
if data: # this is new data
self._busy = False
self.get_quality_info(data) # hook for RUSqual
self._rawsignal = data.rawsignal
self.input_amplitude = data.inp.amplitude * self.input_range
self.output_amplitude = data.out.amplitude * self.output_range
self._iq = iq = data.iq * self.output_range / self.input_range
self.phase = np.arctan2(iq.imag, iq.real) * 180 / np.pi
self.amp = np.abs(iq)
self.read_value()
self.set_status(IDLE, '')
if self.freq.isBusy():
if data.freq == self._requested_freq:
self.log.info('set freq idle %.3f', time.time() % 1.0)
self.freq.set_idle()
else:
self.log.warn('freq does not match: requested %.14g, from data: %.14g',
self._requested_freq, data.freq)
else:
self.log.info('freq not busy %.3f', time.time() % 1.0)
if self._action == CONTINUE:
self.setFastPoll(False)
self.log.info('slow')
return
elif self._busy:
if self._action == DONE_GO:
self.log.info('busy')
self.set_status(BUSY, 'acquiring')
else:
self.set_status(IDLE, 'acquiring')
return
if self._action == CONTINUE and self.continuous:
print('CONTINUE')
self.start_acquisition()
self.set_status(IDLE, 'acquiring')
return
if self._action == GO:
print('pending GO')
self.start_acquisition()
self._action = DONE_GO
self.set_status(BUSY, 'acquiring')
return
if self._action == DONE_GO:
self._action = WAIT_GO
self._wait_until = time.time() + 2
self.set_status(IDLE, 'paused')
return
if self._action == WAIT_GO:
if time.time() > self._wait_until:
self._action = CONTINUE
self.start_acquisition()
self.set_status(IDLE, 'acquiring')
def set_status(self, *status):
self._status = status
if self._status != self.status:
self.read_status()
def read_status(self):
return self._status
def read_value(self):
if self.imod:
self.imod.value = self._iq.real
if self.qmod:
self.qmod.value = self._iq.imag
return self._iq.real, self._iq.imag
@Command
def go(self):
"""start acquisition"""
self.log.info('go %.3f', time.time() % 1.0)
if self._busy:
self._action = GO
else:
self._action = DONE_GO
self.start_acquisition()
self._status = BUSY, 'acquiring'
self.read_status()
def start_acquisition(self):
self.log.info('start %.3f', time.time() % 1.0)
freq = self.freq.read_value()
self.sr = round(self.periods * self.adq.sample_rate / freq)
delay_samples = round(self.input_delay * self.adq.sample_rate * 1e-9)
self.adq.init(self.sr + delay_samples, 1)
self.adq.start(RUSdata(self.adq, freq, self.periods, delay_samples))
self._busy = True
self.setFastPoll(True, self._fast_poll)
class RUSqual(RUS):
"""version with additional info about quality of input and output signal"""
input_phase_stddev = Parameter('input signal quality', FloatRange(unit='rad'), default=0)
output_phase_slope = Parameter('output signal phase slope', FloatRange(unit='rad/sec'), default=0)
output_amp_slope = Parameter('output signal amplitude change', FloatRange(unit='1/sec'), default=0)
def get_quality_info(self, data):
qual = data.get_quality()
self.input_phase_stddev = qual.input_stddev.imag
self.output_phase_slope = qual.output_slope.imag
self.output_amp_slope = qual.output_slope.real