Files
frappy/frappy_psi/tnmr/OTFModule.py
2025-06-24 11:35:32 +02:00

288 lines
13 KiB
Python

# -*- coding: utf-8 -*-
"""
OTFModule
______________________
Version: 1.0
Authors: Davis Garrad (Paul Scherrer Institute, CH)
______________________
frappy-based module for generating and running pulse sequences on TNMR (Tecmag). The On-The-Fly module is meant to represent a SECoP node frappy-side, so it really just interacts with the TNMR software.
"""
#On-the-fly!
import frappy_psi.tnmr.tnmr_interface as te
import frappy_psi.tnmr.sequence_generation as seq_gen
import frappy.core as fc
import frappy
import win32com
import pythoncom
import numpy as np
import threading
import time
import os
import traceback
class ProgrammedSequence(fc.Readable):
"""An NMR device being driven by an instance of TNMR. Requires that an instance of TNMR is opened before creation.
Use
---
Generating a pulse sequence is as simple as setting the sequence_data parameter to be a list of sequence dictionaries, in the order you want them to be executed. The next step is to set the acquisition parameters (see below), including title, acquisition_time, pre_acquisition_time, post_acquisition_time, ringdown_time, and acq_phase_cycle.
Once all the parameters are set, you can call the frappy command and member function compile_and_run() (NOT the private member function __compile_and_run()).
Attributes
----------
value: an array of complexes representing the TNMR data return (technically inherited from Readable)
sequence_data: an array of structs: keys are { 'pulse_width': (width of pulse in us), 'pulse_height': (amplitude of pulse in a.u.), 'delay_time': (delay time in us), 'phase_cycle': (a str denoting a phase cycle, e.g., '0 1 2 3') }
Acquisition Parameters
----------------------
title: a title which will be embedded to the sequence files. Use this for identification.
acquisition_time: float (usecs) which describes the length of acquisition
ringdown_time: float (usecs) which describes the length of ringdown
pre_acquisition_time: float (usecs) which describes the length of time to wait after ringdown finishes (1u is okay)
post_acquisition_time: float (ms) which describes the length of time to wait after finishing acquisition
acq_phase_cycle: str, the phase cycle to run on acquisition (eg., '0 1 1 2', '0 1 2 3', '1 1 2 2 0 0 3 3 1 2 3 4', ...)
num_scans: int (ct), the number of 1D scans to take per sequence
obs_freq: float (MHz), the NMR frequency
Commands
--------
compile and run: finishes building the sequence, saves it and its configuration to files (timestamped), and starts data acquisition on TNMR (async)
Inherited Attributes
--------------------
name: from Module
logger: from Module
cfgdict: from Module
src: from Module
status: from Readable
value: from Readable
pollinterval: from Readable
"""
# inherited
value = fc.Parameter('data_return', fc.StructOf(reals=fc.ArrayOf(fc.FloatRange(), maxlen=4096), # real values
imags=fc.ArrayOf(fc.FloatRange(), maxlen=4096), # imag values
t =fc.ArrayOf(fc.FloatRange(), maxlen=4096)), # times (starting from zero)
default={ 'reals': [], 'imags': [], 't': [] })
status = fc.Parameter(datatype=frappy.datatypes.StatusType(fc.Readable, "DISABLED", 'PREPARED', 'BUSY'), default=('IDLE', 'ok - uncompiled'))
pollinterval = fc.Parameter(default=1)
# basic
title = fc.Parameter('title', fc.StringType(), default='Sequence', readonly=False)
sequence_data = fc.Parameter('sequence_config', fc.ArrayOf(fc.StructOf(pulse_width=fc.FloatRange(unit='u'),
pulse_height=fc.FloatRange(),
delay_time=fc.FloatRange(unit='u'),
phase_cycle=fc.StringType())), default=[], readonly=False)
# final details
acquisition_time = fc.Parameter('acquisition_time', fc.FloatRange(unit='u'), readonly=False, group='sequence_editor', default=204.8) # this is a limit set by the dwell limit and number of acquisition points
ringdown_time = fc.Parameter('ringdown_time', fc.FloatRange(unit='u'), readonly=False, group='sequence_editor', default=1)
pre_acquisition_time = fc.Parameter('pre_acquisition_time', fc.FloatRange(unit='u'), readonly=False, group='sequence_editor', default=1)
post_acquisition_time = fc.Parameter('post_acquisition_time', fc.FloatRange(unit='m'), readonly=False, group='sequence_editor', default=500)
acq_phase_cycle = fc.Parameter('acq_phase_cycle', fc.StringType(), readonly=False, group='sequence_editor', default='')
num_scans = fc.Parameter('num_scans', fc.IntRange(), readonly=False, group='sequence_editor', default=16)
obs_freq = fc.Parameter('obs_freq', fc.FloatRange(unit='MHz'), readonly=False, group='sequence_editor', default=213.16)
compiled_parameters = {} # so that we can store the values of parameters only when compiling, effectively giving us an instance of each parameter loaded into TNMR, as well as "targets" (those above)
inited = False
### SETUP
def tnmr(self):
'''Creates a new instance or retrieves a previously-made instance of the TNMR API wrapper.
Returns
-------
an instance of the TNMR API wrapper.
'''
if not(self.inited):
try:
self.ntnmr = te.TNMR()
self.inited = True
except Exception as e:
print(str(e), repr(e))
self.ntnmr = None
self.inited = False
return self.ntnmr
def initialReads(self):
pass
@fc.Command(description="Compile & Run", argument={'type': 'bool'})
def compile_and_run(self, thread=True):
'''Compiles and runs the currently loaded sequence (in sequence_data), populating this instance's value member with the results.
Parameters
----------
thread: bool, determines if a new thread is created and detached for this process. (default true)
'''
if(thread):
threading.Thread(target=lambda s=self: s.__compile_and_run()).start()
else:
self.__compile_and_run()
@fc.Command(description="Kill")
def kill(self):
'''Aborts the current scan, if one is running. Else, does nothing'''
self.stop()
def stop(self):
try:
self.tnmr().get_instance().Abort
self.status = ('IDLE', 'ok - killed')
except:
pass
### READ/WRITE
def write_title(self, t):
self.title = t
self.status = ('IDLE', 'ok - uncompiled')
return self.read_title()
def write_acquisition_time(self, t):
self.acquisition_time = t
self.status = ('IDLE', 'ok - uncompiled')
return self.read_acquisition_time()
def write_ringdown_time(self, t):
self.ringdown_time = t
self.status = ('IDLE', 'ok - uncompiled')
return self.read_ringdown_time()
def write_pre_acquisition_time(self, t):
self.pre_acquisition_time = t
self.status = ('IDLE', 'ok - uncompiled')
return self.read_pre_acquisition_time()
def write_post_acquisition_time(self, t):
self.post_acquisition_time = t
self.status = ('IDLE', 'ok - uncompiled')
return self.read_post_acquisition_time()
def write_acq_phase_cycle(self, t):
self.acq_phase_cycle = t
self.status = ('IDLE', 'ok - uncompiled')
return self.read_acq_phase_cycle()
def read_num_scans(self):
return self.tnmr().get_nmrparameter('Scans 1D')
def write_num_scans(self, t):
if(self.status[0] != 'BUSY'):
self.tnmr().set_nmrparameter('Scans 1D', t)
self.status = ('IDLE', 'ok - uncompiled')
return self.read_num_scans()
def read_obs_freq(self):
return self.tnmr().get_nmrparameter('Observe Freq.')
def write_obs_freq(self, t):
if(self.status[0] != 'BUSY'):
self.tnmr().set_nmrparameter('Observe Freq.', t)
self.status = ('IDLE', 'ok - uncompiled')
return self.read_obs_freq()
### PRIVATE (Utility)
def __compile_sequence(self):
'''Compiles the sequence loaded in sequence_data.
This involves:
1. creating the sequence table via seq_gen.get_single_pulse_block calls;
2. combining them all;
3. saving this sequence where TNMR can see it, and in a format it can read;
4. taking a copy of all the acquisition parameters (so that if they are changed mid-acquisition, no incorrect information is written to files)
5. telling TNMR to read it (i.e., tnmr().load_sequence()), reloading the dashboard and parameters in the process
5. giving TNMR the correct parameters to populate the new dashboard with
'''
if(self.status[0] != 'BUSY'):
self.status = ('BUSY', 'compiling')
# first, create the sequence
seq = seq_gen.get_initial_block()
i = 0
for s in self.sequence_data:
seq = seq_gen.combine_blocks(seq, seq_gen.get_single_pulse_block(f'pulse_{i}', str(s['pulse_width']) + 'u',
str(s['pulse_height']),
str(s['delay_time']) + 'u',
str(s['phase_cycle'])))
i += 1
seq = seq_gen.combine_blocks(seq, seq_gen.get_final_block(str(self.ringdown_time) + 'u',
str(self.pre_acquisition_time) + 'u',
str(self.acquisition_time) + 'u',
str(self.post_acquisition_time) + 'm',
str(self.acq_phase_cycle)))
# then, save the thing
filepath = os.getcwd()
filename = self.title + f'_{time.time()}'
filename = filepath + '/' + filename.replace('.','')
seq_gen.save_sequence(filename, seq)
seq_gen.save_sequence_cfg(filename, seq)
dashboard_params = { 'Observe Freq.': self.read_obs_freq(),
'Scans 1D': self.read_num_scans(),
}
self.compiled_parameters['ringdown_time'] = self.ringdown_time
self.compiled_parameters['pre_acquisition_time'] = self.pre_acquisition_time
self.compiled_parameters['acquisition_time'] = self.acquisition_time
self.compiled_parameters['post_acquisition_time'] = self.post_acquisition_time
self.compiled_parameters['acq_phase_cycle'] = self.acq_phase_cycle
self.compiled_parameters['num_scans'] = self.read_num_scans()
self.compiled_parameters['obs_freq'] = self.read_obs_freq()
# then, load the thing into TNMR
self.tnmr().load_sequence(filename)
time.sleep(1.0) # hardware module issue???
# load some parameters back to TNMR
for key, val in dashboard_params.items():
self.tnmr().set_nmrparameter(key, val)
time.sleep(0.5)
# finally, let ourselves know we're ready
self.status = ('PREPARED', 'compiled')
else:
traceback.print_exc()
def __zero_go(self):
'''Tells TNMR to acquire data. Only call after __compile_sequence().'''
if(self.status[0] != 'BUSY'):
self.status = ('BUSY', 'acquiring')
self.tnmr().ZeroGo(lock=True, interval=0.5)
newvals = {}
newvals['reals'] = self.tnmr().get_data()[0]
newvals['imags'] = self.tnmr().get_data()[1]
newvals['t'] = [ self.compiled_parameters['acquisition_time'] * i/self.compiled_parameters['num_scans'] for i in range(0, self.compiled_parameters['num_scans']) ]
self.value = newvals
self.status = ('PREPARED', 'compiled')
def __compile_and_run(self, thread=True):
'''Compiles and runs the currently-loaded sequence
Parameters
----------
thread: bool, determines if this should open a child thread and detach the process
'''
self.__compile_sequence()
time.sleep(0.5)
self.__zero_go()