frappy/frappy_psi/ultrasound.py
Markus Zolliker 5d175b89ca frappy_psi.ultrasound: add input_delay and other improvments
Change-Id: I6cb5690d82d96d6775fcb649fc633c4039932463
2025-03-19 15:29:17 +01:00

416 lines
16 KiB
Python

# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Markus Zolliker <markus.zolliker@psi.ch>
# *****************************************************************************
"""frappy support for ultrasound"""
import math
import os
import time
import numpy as np
from frappy_psi.adq_mr import Adq, PEdata, RUSdata
from frappy.core import Attached, BoolType, Done, FloatRange, HasIO, StatusType, \
IntRange, Module, Parameter, Readable, Writable, Drivable, StringIO, StringType, \
IDLE, BUSY, DISABLED, WARN, ERROR, TupleOf, ArrayOf, Command, Attached, EnumType
from frappy.properties import Property
# from frappy.modules import Collector
Collector = Readable
def fname_from_time(t, extension):
tm = time.localtime(t)
dirname = os.path.join('..', 'data', time.strftime("%Y-%m-%d_%H", tm))
filename = time.strftime("%Y-%m-%d_%H-%M-%S_", tm)
filename = filename + ("%.1f" % t)[-1]
if not os.path.isdir(dirname):
os.makedirs(dirname)
return os.path.join(dirname, filename)
class Roi(Readable):
main = Attached()
value = Parameter('amplitude', FloatRange(), default=0)
phase = Parameter('phase', FloatRange(unit='deg'), default=0)
i = Parameter('in phase', FloatRange(), default=0)
q = Parameter('out of phase', FloatRange(), default=0)
time = Parameter('start time', FloatRange(unit='nsec'), readonly=False)
size = Parameter('interval (symmetric around time)', FloatRange(unit='nsec'), readonly=False)
enable = Parameter('calculate this roi', BoolType(), readonly=False, default=True)
pollinterval = Parameter(export=False)
interval = (0, 0)
def initModule(self):
super().initModule()
self.main.register_roi(self)
self.calc_interval()
def calc_interval(self):
self.interval = (self.time - 0.5 * self.size, self.time + 0.5 * self.size)
def read_status(self):
return (IDLE, '') if self.enable else (DISABLED, 'disabled')
def write_time(self, value):
self.time = value
self.calc_interval()
return Done
def write_size(self, value):
self.size = value
self.calc_interval()
return Done
class Pars(Module):
description = 'relevant parameters from SEA'
timestamp = Parameter('unix timestamp', StringType(), default='0', readonly=False)
temperature = Parameter('T', FloatRange(unit='K'), default=0, readonly=False)
mf = Parameter('field', FloatRange(unit='T'), default=0, readonly=False)
sr = Parameter('rotation angle', FloatRange(unit='deg'), default=0, readonly=False)
class FreqStringIO(StringIO):
end_of_line = '\r'
class Frequency(HasIO, Writable):
value = Parameter('frequency', unit='Hz')
amp = Parameter('amplitude (VPP)', FloatRange(unit='V'), readonly=False)
output = Parameter('output: L or R', EnumType(L=1, R=0), readonly=False, default='L')
last_change = 0
ioClass = FreqStringIO
dif = None
_freq = None
def register_dif(self, dif):
self.dif = dif
def read_value(self):
if self._freq is None:
self._freq = float(self.communicate('FREQ?'))
return self._freq
def write_target(self, value):
self._freq = float(self.communicate('FREQ %.15g;FREQ?' % value))
self.last_change = time.time()
if self.dif:
self.dif.read_value()
self.read_value()
return self._freq
def write_amp(self, amp):
reply = self.communicate(f'AMP{self.output.name} {amp} VPP;AMP{self.output.name}? VPP')
return float(reply)
def read_amp(self):
reply = self.communicate(f'AMP{self.output.name}? VPP')
return float(reply)
class FrequencyDif(Readable):
freq = Attached(Frequency)
base = Parameter('base frequency', FloatRange(unit='Hz'), default=0)
value = Parameter('difference to base frequency', FloatRange(unit='Hz'), default=0)
def initModule(self):
super().initModule()
self.freq.register_dif(self)
def read_value(self):
return self.freq - self.base
class Base:
freq = Attached()
sr = Parameter('samples per record', datatype=IntRange(1, 1E9), default=16384)
adq = None
def shutdownModule(self):
if self.adq:
self.adq.deletecu()
self.adq = None
class PulseEcho(Base):
value = Parameter("t, i, q, pulse curves",
TupleOf(*[ArrayOf(FloatRange(), 0, 16283) for _ in range(4)]), default=[[]] * 4)
nr = Parameter('number of records', datatype=IntRange(1, 9999), default=500)
bw = Parameter('bandwidth lowpassfilter', datatype=FloatRange(unit='Hz'), default=10E6)
control = Parameter('control loop on?', BoolType(), readonly=False, default=True)
time = Parameter('pulse start time', FloatRange(unit='nsec'),
readonly=False)
size = Parameter('pulse length (starting from time)', FloatRange(unit='nsec'),
readonly=False)
pulselen = Parameter('adjusted pulse length (integer number of periods)', FloatRange(unit='nsec'), default=1)
_starttime = None
def initModule(self):
super().initModule()
self.adq = Adq()
self.adq.init(self.sr, self.nr)
self.roilist = []
def write_nr(self, value):
self.adq.init(self.sr, value)
def write_sr(self, value):
self.adq.init(value, self.nr)
def write_bw(self, value):
self.adq.bw_cutoff = value
def register_roi(self, roi):
self.roilist.append(roi)
# TODO: fix
# def go(self):
# self._starttime = time.time()
# self.adq.start()
def read_value(self):
# TODO: data = self.get_data()
if self.get_rawdata(): # new data available
roilist = [r for r in self.roilist if r.enable]
freq = self.freq.value
gates = self.adq.gates_and_curves(self._data, freq,
(self.time, self.time + self.size),
[r.interval for r in roilist])
for i, roi in enumerate(roilist):
roi.i = a = gates[i][0]
roi.q = b = gates[i][1]
roi.value = math.sqrt(a ** 2 + b ** 2)
roi.phase = math.atan2(a, b) * 180 / math.pi
return self.adq.curves
# TODO: CONTROL
# inphase = self.roilist[0].i
# if self.control:
# newfreq = freq + inphase * self.slope - self.basefreq
# # step = sorted((-self.maxstep, inphase * self.slope, self.maxstep))[1]
# if self.old:
# fdif = freq - self.old[0]
# idif = inphase - self.old[1]
# if abs(fdif) >= self.minstep:
# self.slope = - fdif / idif
# else:
# fdif = 0
# idif = 0
# newfreq = freq + self.minstep
# self.old = (freq, inphase)
# if self.skipctrl > 0: # do no control for some time after changing frequency
# self.skipctrl -= 1
# elif self.control:
# self.freq = sorted((self.freq - self.maxstep, newfreq, self.freq + self.maxstep))[1]
CONTINUE = 0
GO = 1
DONE_GO = 2
WAIT_GO = 3
class RUS(Base, Collector):
freq = Attached()
imod = Attached(mandatory=False)
qmod = Attached(mandatory=False)
input_signal = Attached(mandatory=False)
output_signal = Attached(mandatory=False)
value = Parameter('averaged (I, Q) tuple', TupleOf(FloatRange(), FloatRange()))
status = Parameter(datatype=StatusType(Readable, 'BUSY'))
periods = Parameter('number of periods', IntRange(1, 9999), default=12)
input_delay = Parameter('throw away everything before this time',
FloatRange(unit='ns'), default=10000, readonly=False)
input_range = Parameter('input range (taking in to account attenuation)', FloatRange(unit='V'),
default=10, readonly=False)
output_range = Parameter('output range', FloatRange(unit='V'),
default=1, readonly=False)
input_phase_stddev = Parameter('input signal quality', FloatRange(unit='rad'), default=0)
output_phase_slope = Parameter('output signal phase slope', FloatRange(unit='rad/sec'), default=0)
output_amp_slope = Parameter('output signal amplitude change', FloatRange(unit='1/sec'), default=0)
input_amplitude = Parameter('input signal amplitude', FloatRange(unit='V'), default=0)
output_amplitude = Parameter('output signal amplitude', FloatRange(unit='V'), default=0)
phase = Parameter('phase', FloatRange(unit='deg'), default=0)
amp = Parameter('amplitude', FloatRange(), default=0)
continuous = Parameter('continuous mode', BoolType(), readonly=False, default=True)
pollinterval = Parameter(datatype=FloatRange(0, 120), default=1)
_starttime = None
_iq = 0
_wait_until = 0 # deadline for returning to continuous mode
_action = CONTINUE # one of CONTINUE, GO, DONE_GO, WAIT_GO
_status = IDLE, 'no data yet'
_busy = False # waiting for end of aquisition (not the same as self.status[0] == BUSY)
def initModule(self):
super().initModule()
self.adq = Adq()
self._ovr_rate = {}
self.freq.addCallback('value', self.update_freq)
# self.write_periods(self.periods)
def update_freq(self, value):
self.setFastPoll(True, 0.001)
def doPoll(self):
try:
data = self.adq.get_data()
except Exception as e:
self.set_status(ERROR, repr(e))
self._busy = False
self._action = WAIT_GO
self.wait_until = time.time() + 2
return
self.setFastPoll(False)
if data: # this is new data
self._data = data
for chan in data.channels:
if chan.ovr_rate:
self._ovr_rate[chan.name] = chan.ovr_rate * 100
else:
self._ovr_rate.pop(chan.name, None)
qual = data.get_quality()
if self.input_signal:
self.input_signal.value = np.round(data.channels[0].binned, 3)
if self.output_signal:
self.output_signal.value = np.round(data.channels[1].binned, 3)
self.input_phase_stddev = qual.input_stddev.imag
self.output_phase_slope = qual.output_slope.imag
self.output_amp_slope = qual.output_slope.real
self.input_amplitude = data.inp.amplitude * self.input_range
self.output_amplitude = data.out.amplitude * self.output_range
self._iq = iq = data.iq * self.output_range / self.input_range
self.phase = np.arctan2(iq.imag, iq.real) * 180 / np.pi
self.amp = np.abs(iq)
self.read_value()
self.set_status(IDLE, '')
elif self._busy:
self._busy = False
if self._action == DONE_GO:
self.set_status(BUSY, 'acquiring')
else:
self.set_status(IDLE, 'acquiring')
return
if self._action == CONTINUE and self.continuous:
self.start_acquisition()
self.set_status(IDLE, 'acquiring')
return
if self._action == GO:
self.start_acquisition()
self._action = DONE_GO
self.set_status(BUSY, 'acquiring')
return
if self._action == DONE_GO:
self._action = WAIT_GO
self._wait_until = time.time() + 2
self.set_status(IDLE, 'paused')
return
if self._action == WAIT_GO:
if time.time() > self._wait_until:
self._action = CONTINUE
self.start_acquisition()
self.set_status(IDLE, 'acquiring')
def set_status(self, *status):
self._status = status
if self._status != self.status:
self.read_status()
def read_status(self):
if self._ovr_rate and self._status[0] < WARN:
return WARN, 'overrange on %s' % ' and '.join(self._ovr_rate)
return self._status
def read_value(self):
if self.imod:
self.imod.value = self._iq.real
if self.qmod:
self.qmod.value = self._iq.imag
return self._iq.real, self._iq.imag
@Command
def go(self):
"""start aquisition"""
if self._busy:
self._action = GO
else:
self._action = DONE_GO
self.start_acquisition()
self._status = BUSY, 'acquiring'
self.read_status()
def start_acquisition(self):
freq = self.freq.read_value()
self.sr = round(self.periods * self.adq.sample_rate / freq)
delay_samples = round(self.input_delay * self.adq.sample_rate * 1e-9)
self.adq.init(self.sr + delay_samples, 1)
self.adq.start(RUSdata(self.adq, freq, self.periods, delay_samples))
self._busy = True
self.setFastPoll(True, 0.001)
class Signal(Readable):
value = Parameter('pulse', ArrayOf(FloatRange(), maxlen=9999))
class ControlLoop(Module):
roi = Attached(Roi)
maxstep = Parameter('max frequency step', FloatRange(unit='Hz'), readonly=False,
default=10000)
minstep = Parameter('min frequency step for slope calculation', FloatRange(unit='Hz'),
readonly=False, default=4000)
slope = Parameter('inphase/frequency slope', FloatRange(), readonly=False,
default=1e6)
# class Frequency(HasIO, Readable):
# pars = Attached()
# curves = Attached(mandatory=False)
# maxy = Property('plot y scale', datatype=FloatRange(), default=0.5)
#
# value = Parameter('frequency@I,q', datatype=FloatRange(unit='Hz'), default=0)
# basefreq = Parameter('base frequency', FloatRange(unit='Hz'), readonly=False)
# nr = Parameter('number of records', datatype=IntRange(1,10000), default=500)
# sr = Parameter('samples per record', datatype=IntRange(1,1E9), default=16384)
# freq = Parameter('target frequency', FloatRange(unit='Hz'), readonly=False)
# bw = Parameter('bandwidth lowpassfilter', datatype=FloatRange(unit='Hz'),default=10E6)
# amp = Parameter('amplitude', FloatRange(unit='dBm'), readonly=False)
# control = Parameter('control loop on?', BoolType(), readonly=False, default=True)
# rusmode = Parameter('RUS mode on?', BoolType(), readonly=False, default=False)
# time = Parameter('pulse start time', FloatRange(unit='nsec'),
# readonly=False)
# size = Parameter('pulse length (starting from time)', FloatRange(unit='nsec'),
# readonly=False)
# pulselen = Parameter('adjusted pulse length (integer number of periods)', FloatRange(unit='nsec'), default=1)
# maxstep = Parameter('max frequency step', FloatRange(unit='Hz'), readonly=False,
# default=10000)
# minstep = Parameter('min frequency step for slope calculation', FloatRange(unit='Hz'),
# readonly=False, default=4000)
# slope = Parameter('inphase/frequency slope', FloatRange(), readonly=False,
# default=1e6)
# plot = Parameter('create plot images', BoolType(), readonly=False, default=True)
# save = Parameter('save data', BoolType(), readonly=False, default=True)
# pollinterval = Parameter(datatype=FloatRange(0,120))