frappy_psi.ultrasound: after rework (still wip)

Change-Id: I200cbeca2dd0f030a01a78ba4d38c342c3c8c8e3
This commit is contained in:
2025-03-17 09:37:13 +01:00
parent fdc868c2d7
commit 7c60daa568
2 changed files with 224 additions and 122 deletions

View File

@ -25,9 +25,9 @@ import time
import numpy as np
from frappy_psi.adq_mr import Adq, PEdata, RUSdata
from frappy.core import Attached, BoolType, Done, FloatRange, HasIO, \
from frappy.core import Attached, BoolType, Done, FloatRange, HasIO, StatusType, \
IntRange, Module, Parameter, Readable, Writable, Drivable, StringIO, StringType, \
IDLE, BUSY, DISABLED, ERROR, TupleOf, ArrayOf, Command
IDLE, BUSY, DISABLED, WARN, ERROR, TupleOf, ArrayOf, Command, Attached
from frappy.properties import Property
#from frappy.modules import Collector
@ -100,15 +100,22 @@ class Frequency(HasIO, Writable):
last_change = 0
ioClass = FreqStringIO
dif = None
_freq = None
def register_dif(self, dif):
self.dif = dif
def read_value(self):
if self._freq is None:
self._freq = float(self.communicate('FREQ?'))
return self._freq
def write_target(self, value):
self.communicate('FREQ %.15g;FREQ?' % value)
self._freq = float(self.communicate('FREQ %.15g;FREQ?' % value))
self.last_change = time.time()
if self.dif:
self.dif.read_value()
return self._freq
def write_amp(self, amp):
reply = self.communicate('AMPR %g;AMPR?' % amp)
@ -132,30 +139,18 @@ class FrequencyDif(Readable):
return self.freq - self.base
class Base(Collector):
class Base:
freq = Attached()
adq = Attached(Adq)
sr = Parameter('samples per record', datatype=IntRange(1, 1E9), default=16384)
pollinterval = Parameter(datatype=FloatRange(0, 120)) # allow pollinterval = 0
_data = None
_data_args = None
adq = None
def read_status(self):
adqstate = self.adq.get_status()
if adqstate == Adq.BUSY:
return BUSY, 'acquiring'
if adqstate == Adq.UNDEFINED:
return ERROR, 'no data yet'
if adqstate == Adq.READY:
return IDLE, 'new data available'
return IDLE, ''
def shutdownModule(self):
if self.adq:
print('shutdownModule')
self.adq.deletecu()
print('shutdoneModule done')
self.adq = None
def get_data(self):
data = self.adq.get_data(*self._data_args)
if id(data) != id(self._data):
self._data = data
return data
return None
class PulseEcho(Base):
@ -170,7 +165,7 @@ class PulseEcho(Base):
readonly=False)
pulselen = Parameter('adjusted pulse length (integer number of periods)', FloatRange(unit='nsec'), default=1)
starttime = None
_starttime = None
def initModule(self):
super().initModule()
@ -190,11 +185,13 @@ class PulseEcho(Base):
def register_roi(self, roi):
self.roilist.append(roi)
def go(self):
self.starttime = time.time()
self.adq.start()
# TODO: fix
# def go(self):
# self._starttime = time.time()
# self.adq.start()
def read_value(self):
# TODO: data = self.get_data()
if self.get_rawdata(): # new data available
roilist = [r for r in self.roilist if r.enable]
freq = self.freq.value
@ -229,54 +226,141 @@ class PulseEcho(Base):
# self.freq = sorted((self.freq - self.maxstep, newfreq, self.freq + self.maxstep))[1]
class RUS(Base):
value = Parameter('averaged (I, Q) tuple', TupleOf(FloatRange(), FloatRange()))
periods = Parameter('number of periods', IntRange(1, 9999), default=12)
scale = Parameter('scale,taking into account input attenuation', FloatRange(), default=0.1)
input_phase_stddev = Parameter('input signal quality', FloatRange(unit='rad'))
output_phase_slope = Parameter('output signal phase slope', FloatRange(unit='rad/sec'))
output_amp_slope = Parameter('output signal amplitude change', FloatRange(unit='1/sec'))
phase = Parameter('phase', FloatRange(unit='deg'))
amp = Parameter('amplitude', FloatRange())
CONTINUE = 0
GO = 1
DONE_GO = 2
WAIT_GO = 3
starttime = None
_data_args = None
class RUS(Base, Collector):
freq = Attached()
imod = Attached(mandatory=False)
qmod = Attached(mandatory=False)
value = Parameter('averaged (I, Q) tuple', TupleOf(FloatRange(), FloatRange()))
status = Parameter(datatype=StatusType(Readable, 'BUSY'))
periods = Parameter('number of periods', IntRange(1, 9999), default=12)
input_range = Parameter('input range (taking in to account attenuation)', FloatRange(unit='V'), default=0.1, readonly=False)
output_range = Parameter('output range', FloatRange(unit='V'), default=0.1, readonly=False)
input_phase_stddev = Parameter('input signal quality', FloatRange(unit='rad'), default=0)
output_phase_slope = Parameter('output signal phase slope', FloatRange(unit='rad/sec'), default=0)
output_amp_slope = Parameter('output signal amplitude change', FloatRange(unit='1/sec'), default=0)
input_amplitude = Parameter('input signal amplitude', FloatRange(unit='V'), default=0)
output_amplitude = Parameter('output signal amplitude', FloatRange(unit='V'), default=0)
phase = Parameter('phase', FloatRange(unit='deg'), default=0)
amp = Parameter('amplitude', FloatRange(), default=0)
continuous = Parameter('continuous mode', BoolType(), readonly=False, default=True)
pollinterval = Parameter(default=1)
_starttime = None
_iq = 0
_wait_until = 0 # deadline for returning to continuous mode
_action = CONTINUE # one of CONTINUE, GO, DONE_GO, WAIT_GO
_status = IDLE, 'no data yet'
_busy = False # waiting for end of aquisition (not the same as self.status[0] == BUSY)
def initModule(self):
super().initModule()
self.adq = Adq()
self._ovr_rate = {}
# self.write_periods(self.periods)
def read_value(self):
if self._data_args is None:
return self.value # or may we raise as no value is defined yet?
data = self.get_data(RUSdata, *self._data_args)
if data:
# data available
data.calc_quality()
self.input_phase_stddev = data.input_stddev.imag
self.output_phase_slope = data.output_slope.imag
self.output_amp_slope = data.output_slope.real
def doPoll(self):
try:
data = self.adq.get_data()
except Exception as e:
self.set_status(ERROR, repr(e))
self._busy = False
self._action = WAIT_GO
self.wait_until = time.time() + 2
return
iq = data.iq * self.scale
self.setFastPoll(False)
if data: # this is new data
self._data = data
for chan in data.channels:
if chan.ovr_rate:
self._ovr_rate[chan.name] = chan.ovr_rate * 100
else:
self._ovr_rate.pop(chan.name, None)
qual = data.get_quality()
self.input_phase_stddev = qual.input_stddev.imag
self.output_phase_slope = qual.output_slope.imag
self.output_amp_slope = qual.output_slope.real
self.input_amplitude = data.inp.amplitude / 2 ** 15 * self.input_range
self.output_amplitude = data.out.amplitude / 2 ** 15 * self.output_range
self._iq = iq = data.iq * self.output_range / self.input_range
self.phase = np.arctan2(iq.imag, iq.real) * 180 / np.pi
self.amp = np.abs(iq.imag, iq.real)
return iq.real, iq.imag
return self.value
self.amp = np.abs(iq)
self.read_value()
self.set_status(IDLE, '')
elif self._busy:
self._busy = False
if self._action == DONE_GO:
self.set_status(BUSY, 'acquiring')
else:
self.set_status(IDLE, 'acquiring')
return
if self._action == CONTINUE and self.continuous:
self.start_acquisition()
self.set_status(IDLE, 'acquiring')
return
if self._action == GO:
self.start_acquisition()
self._action = DONE_GO
self.set_status(BUSY, 'acquiring')
return
if self._action == DONE_GO:
self._action = WAIT_GO
self._wait_until = time.time() + 2
self.set_status(IDLE, 'paused')
return
if self._action == WAIT_GO:
if time.time() > self._wait_until:
self._action = CONTINUE
self.start_acquisition()
self.set_status(IDLE, 'acquiring')
def set_status(self, *status):
self._status = status
if self._status != self.status:
self.read_status()
def read_status(self):
if self._ovr_rate and self._status[0] < WARN:
return WARN, 'overrange on %s' % ' and '.join(self._ovr_rate)
return self._status
def read_value(self):
if self.imod:
self.imod.value = self._iq.real
if self.qmod:
self.qmod.value = self._iq.imag
return self._iq.real, self._iq.imag
@Command
def go(self):
self.starttime = time.time()
freq = self.freq.value
self._data_args = (RUSdata, freq, self.periods)
"""start aquisition"""
if self._busy:
self._action = GO
else:
self._action = DONE_GO
self.start_acquisition()
self._status = BUSY, 'acquiring'
self.read_status()
def start_acquisition(self):
freq = self.freq.read_value()
self.sr = round(self.periods * self.adq.sample_rate / freq)
self.adq.init(self.sr, 1)
self.adq.start()
self.read_status()
self.adq.start(RUSdata(self.adq, freq, self.periods))
self._busy = True
self.setFastPoll(True, 0.001)
class ControlLoop:
class ControlLoop(Module):
roi = Attached(Roi)
maxstep = Parameter('max frequency step', FloatRange(unit='Hz'), readonly=False,
default=10000)
default=10000)
minstep = Parameter('min frequency step for slope calculation', FloatRange(unit='Hz'),
readonly=False, default=4000)
slope = Parameter('inphase/frequency slope', FloatRange(), readonly=False,