# ***************************************************************************** # This program is free software; you can redistribute it and/or modify it under # the terms of the GNU General Public License as published by the Free Software # Foundation; either version 2 of the License, or (at your option) any later # version. # # This program is distributed in the hope that it will be useful, but WITHOUT # ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS # FOR A PARTICULAR PURPOSE. See the GNU General Public License for more # details. # # You should have received a copy of the GNU General Public License along with # this program; if not, write to the Free Software Foundation, Inc., # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # # Module authors: # Markus Zolliker # ***************************************************************************** """frappy support for ultrasound""" import math import os import time import numpy as np from frappy_psi.adq_mr import Adq, PEdata, RUSdata from frappy.core import Attached, BoolType, Done, FloatRange, HasIO, StatusType, \ IntRange, Module, Parameter, Readable, Writable, StatusType, StringIO, StringType, \ IDLE, BUSY, DISABLED, WARN, ERROR, TupleOf, ArrayOf, Command, Attached, EnumType ,\ Drivable from frappy.properties import Property from frappy.lib import clamp # from frappy.modules import Collector Collector = Readable def fname_from_time(t, extension): tm = time.localtime(t) dirname = os.path.join('..', 'data', time.strftime("%Y-%m-%d_%H", tm)) filename = time.strftime("%Y-%m-%d_%H-%M-%S_", tm) filename = filename + ("%.1f" % t)[-1] if not os.path.isdir(dirname): os.makedirs(dirname) return os.path.join(dirname, filename) class Roi(Readable): main = Attached() a = Attached(mandatory=False) # amplitude Readable p = Attached(mandatory=False) # phase Readable i = Attached(mandatory=False) # i Readable q = Attached(mandatory=False) # amplitude Readable value = Parameter('i, q', TupleOf(FloatRange(), FloatRange()), default=(0, 0)) time = Parameter('start time', FloatRange(unit='nsec'), readonly=False) size = Parameter('interval (symmetric around time)', FloatRange(unit='nsec'), readonly=False) enable = Parameter('calculate this roi', BoolType(), readonly=False, default=True) pollinterval = Parameter(export=False) interval = (0, 0) def initModule(self): super().initModule() self.main.register_roi(self) self.calc_interval() def calc_interval(self): self.interval = (self.time - 0.5 * self.size, self.time + 0.5 * self.size) def read_status(self): return (IDLE, '') if self.enable else (DISABLED, 'disabled') def write_time(self, value): self.time = value self.calc_interval() return Done def write_size(self, value): self.size = value self.calc_interval() return Done class ControlRoi(Roi, Writable): freq = Attached() # target is not releavnt -> always moving to I to zero target = Parameter(datatype=Roi.value.datatype, export=False) maxstep = Parameter('max frequency step', FloatRange(unit='Hz'), readonly=False, default=10000) minstep = Parameter('min frequency step for slope calculation', FloatRange(unit='Hz'), readonly=False, default=4000) slope = Parameter('inphase/frequency slope', FloatRange(), readonly=False, default=1e6) control_active = Parameter('are we controlling?', BoolType(), readonly=False, default=0) _freq_target = None _skipctrl = 2 _old = None def doPoll(self): inphase = self.value[0] freq = self.freq.target if freq != self._freq_target: self._freq_target = freq # do no control 2 times after changing frequency self._skipctrl = 2 if self.control_active: if self._old: newfreq = freq + inphase * self.slope fdif = freq - self._old[0] if abs(fdif) >= self.minstep: idif = inphase - self._old[1] self.slope = - fdif / idif else: # do a 'test' step newfreq = freq + self.minstep self.old = (freq, inphase) if self._skipctrl > 0: # do no control for some time after changing frequency self._skipctrl -= 1 elif self.control_active: self._freq_target = self.freq.write_target(clamp(freq - self.maxstep, newfreq, freq + self.maxstep)) def write_target(self, value): self.control_active = True return 0 class Pars(Module): description = 'relevant parameters from SEA' timestamp = Parameter('unix timestamp', StringType(), default='0', readonly=False) temperature = Parameter('T', FloatRange(unit='K'), default=0, readonly=False) mf = Parameter('field', FloatRange(unit='T'), default=0, readonly=False) sr = Parameter('rotation angle', FloatRange(unit='deg'), default=0, readonly=False) class FreqStringIO(StringIO): end_of_line = '\r\n' class Frequency(HasIO, Drivable): value = Parameter('frequency', unit='Hz') amp = Parameter('amplitude (VPP)', FloatRange(unit='V'), readonly=False) output = Parameter('output: L or R', EnumType(L=1, R=0), readonly=False, default='L') last_change = 0 ioClass = FreqStringIO dif = None _started = 0 _within_write_target = False _nopoll_until = 0 def doPoll(self): super().doPoll() if self.isBusy() and time.time() > self._started + 5: self.status = WARN, 'acquisition timeout' def register_dif(self, dif): self.dif = dif def read_value(self): if time.time() > self._nopoll_until or self.value == 0: self.value = float(self.communicate('FREQ?')) if self.dif: self.dif.read_value() return self.value def set_busy(self): """called by an acquisition module from a callback on value within write_target """ if self._within_write_target: self._started = time.time() self.log.info('set busy') self.status = BUSY, 'waiting for acquisition' def set_idle(self): if self.isBusy(): self.status = IDLE, '' def write_target(self, value): self._nopoll_until = time.time() + 10 try: self._within_write_target = True # may trigger busy=True from an acquisition module self.value = float(self.communicate('FREQ %.15g;FREQ?' % value)) self.last_change = time.time() if self.dif: self.dif.read_value() return self.value finally: self._within_write_target = False def write_amp(self, amp): self._nopoll_until = time.time() + 10 self.amp = float(self.communicate(f'AMP{self.output.name} {amp} VPP;AMP{self.output.name}? VPP')) return self.amp def read_amp(self): if time.time() > self._nopoll_until or self.amp == 0: return float(self.communicate(f'AMP{self.output.name}? VPP')) return self.amp class FrequencyDif(Drivable): freq = Attached(Frequency) base = Parameter('base frequency', FloatRange(unit='Hz'), default=0) value = Parameter('difference to base frequency', FloatRange(unit='Hz'), default=0) def initModule(self): super().initModule() self.freq.register_dif(self) def write_value(self, target): self.freq.write_target(target + self.base) return self.value # this was updated in Frequency def read_value(self): return self.freq.value - self.base def read_status(self): return self.freq.read_status() class Base: freq = Attached() sr = Parameter('samples per record', datatype=IntRange(1, 1E9), default=16384) adq = None _rawsignal = None _fast_poll = 0.001 def shutdownModule(self): if self.adq: self.adq.deletecu() self.adq = None @Command(argument=TupleOf(FloatRange(unit='ns'), FloatRange(unit='ns'), IntRange(0,99999)), result=TupleOf(FloatRange(), ArrayOf(ArrayOf(IntRange(-0x7fff, 0x7fff), 0, 99999)))) def get_signal(self, start, end, npoints): """get signal :param start: start time (ns) :param end: end time (ns) :param npoints: hint for number of data points :return: (, array of array of y) for performance reasons the result data is rounded to int16 """ # convert ns to samples sr = self.adq.sample_rate * 1e-9 istart = round(start * sr) iend = min(self.sr, round(end * sr)) nbin = max(1, round((iend - istart) / npoints)) iend = iend // nbin * nbin return (nbin / sr, [np.round(ch[istart:iend].reshape((-1, nbin)).mean(axis=1)) for ch in self._rawsignal]) class PulseEcho(Base, Readable): value = Parameter(default=0) nr = Parameter('number of records', datatype=IntRange(1, 9999), default=500) bw = Parameter('bandwidth lowpassfilter', datatype=FloatRange(unit='Hz'), default=10E6) time = Parameter('pulse start time', FloatRange(unit='nsec'), readonly=False) size = Parameter('pulse length (starting from time)', FloatRange(unit='nsec'), readonly=False) pulselen = Parameter('adjusted pulse length (integer number of periods)', FloatRange(unit='nsec'), default=1) # curves = Attached(mandatory=False) pollinterval = Parameter('poll interval', datatype=FloatRange(0,120)) _starttime = None def initModule(self): super().initModule() self.adq = Adq() self.adq.init(self.sr, self.nr) self.roilist = [] self.setFastPoll(True, self._fast_poll) def doPoll(self): try: data = self.adq.get_data() except Exception as e: self.status = ERROR, repr(e) return if data is None: if self.adq.busy: return self.adq.start(PEdata(self.adq)) self.setFastPoll(True, self._fast_poll) return roilist = [r for r in self.roilist if r.enable] freq = self.freq.read_value() if not freq: self.log.info('freq=0') return gates, curves = data.gates_and_curves( freq, (self.time, self.time + self.size), [(r.time, r.time + r.size) for r in roilist], self.bw) for i, roi in enumerate(roilist): a = gates[i][0] b = gates[i][1] roi.value = a, b if roi.i: roi.i.value = a if roi.q: roi.q.value = b if roi.a: roi.a.value = math.sqrt(a ** 2 + b ** 2) if roi.p: roi.p.value = math.atan2(a, b) * 180 / math.pi self._curves = curves self._rawsignal = data.rawsignal @Command(result=TupleOf(*[ArrayOf(FloatRange(), 0, 99999) for _ in range(4)])) def get_curves(self): """retrieve curves""" return self._curves def write_nr(self, value): self.adq.init(self.sr, value) def write_sr(self, value): self.adq.init(value, self.nr) def write_bw(self, value): self.adq.bw_cutoff = value def register_roi(self, roi): self.roilist.append(roi) CONTINUE = 0 GO = 1 DONE_GO = 2 WAIT_GO = 3 class RUS(Base, Collector): freq = Attached() imod = Attached(mandatory=False) qmod = Attached(mandatory=False) value = Parameter('averaged (I, Q) tuple', TupleOf(FloatRange(), FloatRange())) status = Parameter(datatype=StatusType(Readable, 'BUSY')) periods = Parameter('number of periods', IntRange(1, 999999), default=12, readonly=False) input_delay = Parameter('throw away everything before this time', FloatRange(unit='ns'), default=10000, readonly=False) input_range = Parameter('input range (taking into account attenuation)', FloatRange(unit='V'), default=10, readonly=False) output_range = Parameter('output range', FloatRange(unit='V'), default=1, readonly=False) input_amplitude = Parameter('input signal amplitude', FloatRange(unit='V'), default=0) output_amplitude = Parameter('output signal amplitude', FloatRange(unit='V'), default=0) phase = Parameter('phase', FloatRange(unit='deg'), default=0) amp = Parameter('amplitude', FloatRange(), default=0) continuous = Parameter('continuous mode', BoolType(), readonly=False, default=True) pollinterval = Parameter(datatype=FloatRange(0, 120), default=5) _starttime = None _iq = 0 _wait_until = 0 # deadline for returning to continuous mode _action = CONTINUE # one of CONTINUE, GO, DONE_GO, WAIT_GO _status = IDLE, 'no data yet' _busy = False # waiting for end of aquisition (not the same as self.status[0] == BUSY) _requested_freq = None def initModule(self): super().initModule() self.adq = Adq() self.freq.addCallback('value', self.update_freq) self.freq.addCallback('target', self.update_freq_target) # self.write_periods(self.periods) def update_freq_target(self, value): self.go() def update_freq(self, value): self.setFastPoll(True, self._fast_poll) self._requested_freq = value self.freq.set_busy() # is only effective when the update was trigger within freq.write_target def get_quality_info(self, data): """hook for RESqual""" data.timer.show() def doPoll(self): try: data = self.adq.get_data() except Exception as e: self.set_status(ERROR, repr(e)) self._busy = False self._action = WAIT_GO self.wait_until = time.time() + 2 return if data: # this is new data self._busy = False self.get_quality_info(data) # hook for RUSqual self._rawsignal = data.rawsignal self.input_amplitude = data.inp.amplitude * self.input_range self.output_amplitude = data.out.amplitude * self.output_range self._iq = iq = data.iq * self.output_range / self.input_range self.phase = np.arctan2(iq.imag, iq.real) * 180 / np.pi self.amp = np.abs(iq) self.read_value() self.set_status(IDLE, '') if self.freq.isBusy(): if data.freq == self._requested_freq: self.log.info('set freq idle %.3f', time.time() % 1.0) self.freq.set_idle() else: self.log.warn('freq does not match: requested %.14g, from data: %.14g', self._requested_freq, data.freq) else: self.log.info('freq not busy %.3f', time.time() % 1.0) if self._action == CONTINUE: self.setFastPoll(False) self.log.info('slow') return elif self._busy: if self._action == DONE_GO: self.log.info('busy') self.set_status(BUSY, 'acquiring') else: self.set_status(IDLE, 'acquiring') return if self._action == CONTINUE and self.continuous: print('CONTINUE') self.start_acquisition() self.set_status(IDLE, 'acquiring') return if self._action == GO: print('pending GO') self.start_acquisition() self._action = DONE_GO self.set_status(BUSY, 'acquiring') return if self._action == DONE_GO: self._action = WAIT_GO self._wait_until = time.time() + 2 self.set_status(IDLE, 'paused') return if self._action == WAIT_GO: if time.time() > self._wait_until: self._action = CONTINUE self.start_acquisition() self.set_status(IDLE, 'acquiring') def set_status(self, *status): self._status = status if self._status != self.status: self.read_status() def read_status(self): return self._status def read_value(self): if self.imod: self.imod.value = self._iq.real if self.qmod: self.qmod.value = self._iq.imag return self._iq.real, self._iq.imag @Command def go(self): """start acquisition""" self.log.info('go %.3f', time.time() % 1.0) if self._busy: self._action = GO else: self._action = DONE_GO self.start_acquisition() self._status = BUSY, 'acquiring' self.read_status() def start_acquisition(self): self.log.info('start %.3f', time.time() % 1.0) freq = self.freq.read_value() self.sr = round(self.periods * self.adq.sample_rate / freq) delay_samples = round(self.input_delay * self.adq.sample_rate * 1e-9) self.adq.init(self.sr + delay_samples, 1) self.adq.start(RUSdata(self.adq, freq, self.periods, delay_samples)) self._busy = True self.setFastPoll(True, self._fast_poll) class RUSqual(RUS): """version with additional info about quality of input and output signal""" input_phase_stddev = Parameter('input signal quality', FloatRange(unit='rad'), default=0) output_phase_slope = Parameter('output signal phase slope', FloatRange(unit='rad/sec'), default=0) output_amp_slope = Parameter('output signal amplitude change', FloatRange(unit='1/sec'), default=0) def get_quality_info(self, data): qual = data.get_quality() self.input_phase_stddev = qual.input_stddev.imag self.output_phase_slope = qual.output_slope.imag self.output_amp_slope = qual.output_slope.real