Did some renaming to make things make sense
This commit is contained in:
396
frappy_psi/tnmr/TNMRModule.py
Normal file
396
frappy_psi/tnmr/TNMRModule.py
Normal file
@@ -0,0 +1,396 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
OTFModule
|
||||
______________________
|
||||
|
||||
Version: 1.0
|
||||
Authors: Davis Garrad (Paul Scherrer Institute, CH)
|
||||
______________________
|
||||
|
||||
frappy-based module for generating and running pulse sequences on TNMR (Tecmag). The On-The-Fly module is meant to represent a SECoP node frappy-side, so it really just interacts with the TNMR software.
|
||||
|
||||
"""
|
||||
#On-the-fly!
|
||||
|
||||
import frappy_psi.tnmr.tnmr_interface as te
|
||||
import frappy_psi.tnmr.sequence_generation as seq_gen
|
||||
|
||||
import frappy.core as fc
|
||||
import frappy
|
||||
|
||||
from frappy.errors import ProgrammingError
|
||||
|
||||
import win32com
|
||||
import pythoncom
|
||||
import numpy as np
|
||||
|
||||
import threading
|
||||
import time
|
||||
import os
|
||||
import traceback
|
||||
|
||||
TNMR_MAX_PULSES = 100
|
||||
|
||||
class TNMRModule(fc.Drivable): # Drivable only for kill() funcitonality
|
||||
"""An NMR device being driven by an instance of TNMR. Requires that an instance of TNMR is opened; if one is not, it will attempt to open one, though it is not guaranteed that this will always work.
|
||||
|
||||
Use
|
||||
---
|
||||
Generating a pulse sequence is as simple as setting the sequence_data parameter to be a list of sequence dictionaries, in the order you want them to be executed. The next step is to set the acquisition parameters (see below), including title, acquisition_time, pre_acquisition_time, post_acquisition_time, ringdown_time, and acq_phase_cycle.
|
||||
|
||||
Once all the parameters are set, you can call the frappy command and member function compile_and_run() (NOT the private member function __compile_and_run()).
|
||||
|
||||
Attributes
|
||||
----------
|
||||
value: an array of complexes representing the TNMR data return (technically inherited from Readable)
|
||||
sequence_data: an array of structs: keys are { 'pulse_width': (width of pulse in us), 'pulse_height': (amplitude of pulse in percentage - 30 is 30%, 0.3 is 0.3% etc.), 'delay_time': (delay time in us), 'phase_cycle': (a str denoting a phase cycle, e.g., '0 1 2 3') }
|
||||
|
||||
Acquisition Parameters
|
||||
----------------------
|
||||
These parameters can be set from a dictionary, using the update_parameters method/command.
|
||||
|
||||
title: a title which will be embedded to the sequence files. Use this for identification.
|
||||
acquisition_time: float (usecs) which describes the length of acquisition
|
||||
ringdown_time: float (usecs) which describes the length of ringdown
|
||||
pre_acquisition_time: float (usecs) which describes the length of time to wait after ringdown finishes (1u is okay)
|
||||
post_acquisition_time: float (ms) which describes the length of time to wait after finishing acquisition
|
||||
acq_phase_cycle: str, the phase cycle to run on acquisition (eg., '0 1 1 2', '0 1 2 3', '1 1 2 2 0 0 3 3 1 2 3 4', ...)
|
||||
num_acqs: int (ct), the number of 1D scans to take per sequence
|
||||
obs_freq: float (MHz), the NMR frequency
|
||||
|
||||
|
||||
Commands
|
||||
--------
|
||||
compile and run: finishes building the sequence, saves it and its configuration to files (timestamped), and starts data acquisition on TNMR (async)
|
||||
|
||||
Inherited Attributes
|
||||
--------------------
|
||||
name: from Module
|
||||
logger: from Module
|
||||
cfgdict: from Module
|
||||
src: from Module
|
||||
status: from Readable
|
||||
value: from Readable
|
||||
pollinterval: from Readable
|
||||
"""
|
||||
|
||||
# inherited
|
||||
value = fc.Parameter('data_return', fc.StructOf(reals=fc.ArrayOf(fc.FloatRange(), maxlen=4096), # real values
|
||||
imags=fc.ArrayOf(fc.FloatRange(), maxlen=4096), # imag values
|
||||
t =fc.ArrayOf(fc.FloatRange(), maxlen=4096)), # times (starting from zero)
|
||||
default={ 'reals': [], 'imags': [], 't': [] })
|
||||
target = fc.Parameter('dummy', fc.StructOf(reals=fc.ArrayOf(fc.FloatRange(), maxlen=4096), # real values
|
||||
imags=fc.ArrayOf(fc.FloatRange(), maxlen=4096), # imag values
|
||||
t =fc.ArrayOf(fc.FloatRange(), maxlen=4096)), # times (starting from zero)
|
||||
default={ 'reals': [], 'imags': [], 't': [] }, readonly=True, visibility='w--')
|
||||
|
||||
status = fc.Parameter(datatype=frappy.datatypes.StatusType(fc.Drivable, "DISABLED", 'PREPARED', 'BUSY'), default=('IDLE', 'ok - uncompiled'))
|
||||
pollinterval = fc.Parameter(default=1)
|
||||
|
||||
# basic
|
||||
title = fc.Parameter('title', fc.StringType(), default='sequence', readonly=False)
|
||||
sample = fc.Parameter('sample', fc.StringType(), default='', readonly=False)
|
||||
comments = fc.Parameter('comments', fc.StringType(), default='', readonly=False)
|
||||
nucleus = fc.Parameter('nucleus', fc.StringType(), default='', readonly=False)
|
||||
|
||||
sequence_length = fc.Parameter('sequence_length', fc.IntRange(), default=0, readonly=True)
|
||||
sequence_data = fc.Parameter('sequence_config', fc.ArrayOf(fc.StructOf(pulse_width=fc.FloatRange(unit='usecs'),
|
||||
pulse_height=fc.FloatRange(unit='%'),
|
||||
delay_time=fc.FloatRange(unit='usecs'),
|
||||
phase_cycle=fc.StringType()), minlen=0), default=[{'pulse_width':0,'pulse_height':0,'delay_time':0,'phase_cycle':''}]*TNMR_MAX_PULSES, readonly=False)
|
||||
num_acqs_actual = fc.Parameter('num_acqs', fc.IntRange(), readonly=True, default=0)
|
||||
|
||||
# final details
|
||||
acquisition_time = fc.Parameter('acquisition_time', fc.FloatRange(unit='usecs'), readonly=False, group='sequence_editor', default=204.8) # this is a limit set by the dwell limit and number of acquisition points
|
||||
num_acq_points = fc.Parameter('num_acq_points', fc.IntRange(), readonly=False, group='sequence_editor', default=1024)
|
||||
ringdown_time = fc.Parameter('ringdown_time', fc.FloatRange(unit='usecs'), readonly=False, group='sequence_editor', default=1)
|
||||
pre_acquisition_time = fc.Parameter('pre_acquisition_time', fc.FloatRange(unit='usecs'), readonly=False, group='sequence_editor', default=1)
|
||||
post_acquisition_time = fc.Parameter('post_acquisition_time', fc.FloatRange(unit='msecs'), readonly=False, group='sequence_editor', default=500)
|
||||
acq_phase_cycle = fc.Parameter('acq_phase_cycle', fc.StringType(), readonly=False, group='sequence_editor', default='')
|
||||
num_acqs = fc.Parameter('num_acqs', fc.IntRange(), readonly=False, group='sequence_editor', default=16)
|
||||
obs_freq = fc.Parameter('obs_freq', fc.FloatRange(unit='MHz'), readonly=False, group='sequence_editor', default=213.16)
|
||||
|
||||
compiled_parameters = {} # so that we can store the values of parameters only when compiling, effectively giving us an instance of each parameter loaded into TNMR, as well as "targets" (those above)
|
||||
inited = False
|
||||
starting = False
|
||||
approx_sequence_length = 0
|
||||
|
||||
### SETUP
|
||||
def tnmr(self):
|
||||
'''Creates a new instance or retrieves a previously-made instance of the TNMR API wrapper.
|
||||
|
||||
Returns
|
||||
-------
|
||||
an instance of the TNMR API wrapper.
|
||||
'''
|
||||
if not(self.inited):
|
||||
try:
|
||||
self.ntnmr = te.TNMR()
|
||||
self.inited = True
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
self.ntnmr = None
|
||||
self.inited = False
|
||||
self.status = ('ERROR', 'TNMR disconnected!')
|
||||
return self.ntnmr
|
||||
|
||||
def initialReads(self):
|
||||
pass
|
||||
|
||||
@fc.Command(description="Compile & Run", argument={'type': 'bool'})
|
||||
def compile_and_run(self, thread=True):
|
||||
'''Compiles and runs the currently loaded sequence (in sequence_data), populating this instance's value member with the results.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
thread: bool, determines if a new thread is created and detached for this process. (default true)
|
||||
'''
|
||||
if(thread):
|
||||
threading.Thread(target=lambda s=self: s.__compile_and_run()).start()
|
||||
else:
|
||||
self.__compile_and_run()
|
||||
|
||||
@fc.Command(description="Kill")
|
||||
def kill(self):
|
||||
'''Aborts the current scan, if one is running. Else, does nothing'''
|
||||
self.stop()
|
||||
|
||||
def stop(self):
|
||||
try:
|
||||
self.tnmr().get_instance().Abort()
|
||||
self.status = ('IDLE', 'ok - killed')
|
||||
except:
|
||||
traceback.print_exc()
|
||||
pass
|
||||
|
||||
@fc.Command(description='Updates any device parameters via a dictionary', argument={'type': 'dict'})
|
||||
def update_parameters(self, dct):
|
||||
for k, v in dct.items():
|
||||
if(hasattr(self, k)):
|
||||
setattr(self, k, v)
|
||||
else:
|
||||
self.logger.info(f'Bad TNMR parameter: {k}')
|
||||
|
||||
### READ/WRITE
|
||||
|
||||
def read_status(self):
|
||||
if not(self.inited):
|
||||
self.status = ('ERROR', 'TNMR disconnected!')
|
||||
else:
|
||||
if(self.starting):
|
||||
self.status = ('BUSY', 'starting')
|
||||
else:
|
||||
if(self.tnmr().acquisition_running()):
|
||||
self.status = ('BUSY', 'acquiring')
|
||||
elif(self.status[1] == 'acquiring'):
|
||||
# we've just finished acquiring, in frappy's perspective
|
||||
self.status = ('PREPARED', 'compiled')
|
||||
return self.status
|
||||
|
||||
def write_title(self, t):
|
||||
self.title = t
|
||||
self.status = ('IDLE', 'ok - uncompiled')
|
||||
return self.read_title()
|
||||
|
||||
def write_sample(self, t):
|
||||
self.sample = t
|
||||
self.status = ('IDLE', 'ok - uncompiled')
|
||||
return self.read_sample()
|
||||
|
||||
def write_comments(self, t):
|
||||
self.comments = t
|
||||
self.status = ('IDLE', 'ok - uncompiled')
|
||||
return self.read_comments()
|
||||
|
||||
def write_nucleus(self, t):
|
||||
self.nucleus = t
|
||||
self.status = ('IDLE', 'ok - uncompiled')
|
||||
return self.read_nucleus()
|
||||
|
||||
def write_acquisition_time(self, t):
|
||||
self.acquisition_time = t
|
||||
self.status = ('IDLE', 'ok - uncompiled')
|
||||
return self.read_acquisition_time()
|
||||
|
||||
def write_ringdown_time(self, t):
|
||||
self.ringdown_time = t
|
||||
self.status = ('IDLE', 'ok - uncompiled')
|
||||
return self.read_ringdown_time()
|
||||
|
||||
def write_pre_acquisition_time(self, t):
|
||||
self.pre_acquisition_time = t
|
||||
self.status = ('IDLE', 'ok - uncompiled')
|
||||
return self.read_pre_acquisition_time()
|
||||
|
||||
def write_post_acquisition_time(self, t):
|
||||
self.post_acquisition_time = t
|
||||
self.status = ('IDLE', 'ok - uncompiled')
|
||||
return self.read_post_acquisition_time()
|
||||
|
||||
def write_acq_phase_cycle(self, t):
|
||||
self.acq_phase_cycle = t
|
||||
self.status = ('IDLE', 'ok - uncompiled')
|
||||
return self.read_acq_phase_cycle()
|
||||
|
||||
def read_num_acqs(self):
|
||||
return int(self.tnmr().get_nmrparameter('Scans 1D'))
|
||||
|
||||
def write_num_acqs(self, t):
|
||||
if(self.status[0] != 'BUSY'):
|
||||
self.tnmr().set_nmrparameter('Scans 1D', t)
|
||||
self.status = ('IDLE', 'ok - uncompiled')
|
||||
return self.read_num_acqs()
|
||||
|
||||
def write_num_acq_points(self, t):
|
||||
if(self.status[0] != 'BUSY'):
|
||||
self.status = ('IDLE', 'ok - uncompiled')
|
||||
self.num_points = t
|
||||
return self.read_num_acq_points()
|
||||
|
||||
def read_obs_freq(self):
|
||||
return self.tnmr().get_nmrparameter('Observe Freq.')
|
||||
|
||||
def write_obs_freq(self, t):
|
||||
if(self.status[0] != 'BUSY'):
|
||||
self.tnmr().set_nmrparameter('Observe Freq.', t)
|
||||
self.status = ('IDLE', 'ok - uncompiled')
|
||||
return self.read_obs_freq()
|
||||
|
||||
def write_sequence_data(self, t):
|
||||
self.sequence_length = len(t)
|
||||
seq = []
|
||||
seq += t
|
||||
print(seq)
|
||||
seq += [{'pulse_width':0,'pulse_height':0,'delay_time':0,'phase_cycle':''}] * (TNMR_MAX_PULSES-self.sequence_length) # because nicos will only send the smallest size it has ever sent...
|
||||
self.sequence_data = seq
|
||||
|
||||
return self.read_sequence_data()
|
||||
|
||||
def read_value(self):
|
||||
newvals = {}
|
||||
try:
|
||||
d = self.tnmr().get_data()
|
||||
newvals['reals'] = d[0]
|
||||
newvals['imags'] = d[1]
|
||||
newvals['t'] = [ self.compiled_parameters['acquisition_time'] * i/len(d[0]) for i in range(0, len(d[0])) ]
|
||||
except:
|
||||
newvals['reals'] = []
|
||||
newvals['imags'] = []
|
||||
newvals['t'] = []
|
||||
return newvals
|
||||
|
||||
def read_num_acqs_actual(self):
|
||||
try:
|
||||
n = self.tnmr().get_nmrparameter('Actual Scans 1D')
|
||||
return int(n)
|
||||
except:
|
||||
return 0
|
||||
|
||||
### PRIVATE (Utility)
|
||||
def __compile_sequence(self):
|
||||
'''Compiles the sequence loaded in sequence_data.
|
||||
|
||||
This involves:
|
||||
1. creating the sequence table via seq_gen.get_single_pulse_block calls;
|
||||
2. combining them all;
|
||||
3. saving this sequence where TNMR can see it, and in a format it can read;
|
||||
4. taking a copy of all the acquisition parameters (so that if they are changed mid-acquisition, no incorrect information is written to files)
|
||||
5. telling TNMR to read it (i.e., tnmr().load_sequence()), reloading the dashboard and parameters in the process
|
||||
5. giving TNMR the correct parameters to populate the new dashboard with
|
||||
'''
|
||||
if(self.status[0] != 'BUSY'):
|
||||
self.status = ('BUSY', 'compiling')
|
||||
# first, create the sequence
|
||||
seq = seq_gen.get_initial_block()
|
||||
i = 0
|
||||
self.approx_sequence_length = 0
|
||||
for si in range(self.sequence_length):
|
||||
s = self.sequence_data[si]
|
||||
seq = seq_gen.combine_blocks(seq, seq_gen.get_single_pulse_block(f'pulse_{i}', str(s['pulse_width']) + 'u',
|
||||
str(s['pulse_height']),
|
||||
str(s['delay_time']) + 'u',
|
||||
str(s['phase_cycle'])))
|
||||
self.approx_sequence_length += float(s['delay_time'])*1e-6
|
||||
self.approx_sequence_length += float(s['pulse_width'])*1e-6
|
||||
i += 1
|
||||
seq = seq_gen.combine_blocks(seq, seq_gen.get_final_block(str(self.ringdown_time) + 'u',
|
||||
str(self.pre_acquisition_time) + 'u',
|
||||
str(self.acquisition_time) + 'u',
|
||||
str(self.post_acquisition_time) + 'm',
|
||||
str(self.acq_phase_cycle),
|
||||
str(self.num_acq_points)))
|
||||
|
||||
self.approx_sequence_length += float(self.acquisition_time)*1e-6
|
||||
self.approx_sequence_length += float(self.post_acquisition_time)*1e-6
|
||||
|
||||
# then, save the thing
|
||||
# save it in a reasonable location, within frappy
|
||||
filepath = os.path.dirname(os.path.realpath(__file__))
|
||||
filename = self.title + f'_{time.time()}'
|
||||
filename = filepath + '/sequences/' + filename.replace('.','')
|
||||
seq_gen.save_sequence(filename, seq)
|
||||
seq_gen.save_sequence_cfg(filename, seq)
|
||||
|
||||
dashboard_params = { 'Observe Freq.': self.read_obs_freq(),
|
||||
'Scans 1D': self.read_num_acqs(),
|
||||
}
|
||||
|
||||
self.compiled_parameters['ringdown_time'] = self.ringdown_time
|
||||
self.compiled_parameters['pre_acquisition_time'] = self.pre_acquisition_time
|
||||
self.compiled_parameters['acquisition_time'] = self.acquisition_time
|
||||
self.compiled_parameters['post_acquisition_time'] = self.post_acquisition_time
|
||||
self.compiled_parameters['acq_phase_cycle'] = self.acq_phase_cycle
|
||||
self.compiled_parameters['num_acqs'] = self.read_num_acqs()
|
||||
self.compiled_parameters['num_acq_points'] = self.read_num_acq_points()
|
||||
self.compiled_parameters['obs_freq'] = self.read_obs_freq()
|
||||
self.compiled_parameters['title'] = self.read_title()
|
||||
self.compiled_parameters['comments'] = self.read_comments()
|
||||
self.compiled_parameters['nucleus'] = self.read_nucleus()
|
||||
self.compiled_parameters['sample'] = self.read_sample()
|
||||
|
||||
# then, load the thing into TNMR
|
||||
success = self.tnmr().load_sequence(filename)
|
||||
if not(success):
|
||||
self.logger.info('Retrying load pulse sequence')
|
||||
success = self.tnmr().load_sequence(filename)
|
||||
if not(success):
|
||||
self.logger.info('Failed pulse sequence load!')
|
||||
raise Exception()
|
||||
|
||||
# load some parameters back to TNMR
|
||||
for key, val in dashboard_params.items():
|
||||
self.tnmr().set_nmrparameter(key, val)
|
||||
|
||||
# finally, let ourselves know we're ready
|
||||
self.status = ('PREPARED', 'compiled')
|
||||
else:
|
||||
traceback.print_exc()
|
||||
|
||||
def __zero_go(self):
|
||||
'''Tells TNMR to acquire data. Only call after __compile_sequence().'''
|
||||
if(self.status[0] != 'BUSY' or self.starting):
|
||||
self.tnmr().ZeroGo(lock=False, check_time=max(int(self.approx_sequence_length*1.5), 5))
|
||||
|
||||
def __compile_and_run(self, thread=True):
|
||||
'''Compiles and runs the currently-loaded sequence
|
||||
|
||||
Parameters
|
||||
----------
|
||||
thread: bool, determines if this should open a child thread and detach the process
|
||||
'''
|
||||
self.starting = True
|
||||
self.__compile_sequence()
|
||||
time.sleep(1.0)
|
||||
self.__zero_go()
|
||||
self.starting = False
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user