frappy_psi: Added support for changing the observation frequency & number of scans. Further, added an automatic dashboard load on file setup, and a template dashboard for the Scout device.

This commit is contained in:
2025-06-13 07:58:25 +02:00
parent 365f0a2374
commit 2fce39c381
4 changed files with 110 additions and 44 deletions

View File

@@ -68,7 +68,7 @@ class ProgrammedSequence(fc.Readable):
imags=fc.ArrayOf(fc.FloatRange(), maxlen=4096), # imag values
t =fc.ArrayOf(fc.FloatRange(), maxlen=4096)), # times (starting from zero)
default={ 'reals': [], 'imags': [], 't': [] })
status = fc.Parameter(datatype=frappy.datatypes.StatusType(fc.Readable, "DISABLED", 'PREPARED', 'BUSY'))
status = fc.Parameter(datatype=frappy.datatypes.StatusType(fc.Readable, "DISABLED", 'PREPARED', 'BUSY'), default=('IDLE', 'ok - uncompiled'))
pollinterval = fc.Parameter(default=1)
# basic
@@ -90,14 +90,21 @@ class ProgrammedSequence(fc.Readable):
pre_acquisition_time = fc.Parameter('pre_acquisition_time', fc.FloatRange(unit='u'), readonly=False, group='sequence_editor', default=1)
post_acquisition_time = fc.Parameter('post_acquisition_time', fc.FloatRange(unit='m'), readonly=False, group='sequence_editor', default=500)
acq_phase_cycle = fc.Parameter('acq_phase_cycle', fc.StringType(), readonly=False, group='sequence_editor', default='')
num_scans = fc.Parameter('num_scans', fc.IntRange(), readonly=False, group='sequence_editor', default=16)
obs_freq = fc.Parameter('obs_freq', fc.FloatRange(unit='MHz'), readonly=False, group='sequence_editor', default=213.16)
inited = False
### SETUP
def tnmr(self):
if not(self.inited):
self.ntnmr = te.TNMR()
self.inited = True
try:
self.ntnmr = te.TNMR()
self.inited = True
except Exception as e:
print(str(e), repr(e))
self.ntnmr = None
self.inited = False
return self.ntnmr
def initialReads(self):
@@ -134,6 +141,14 @@ class ProgrammedSequence(fc.Readable):
threading.Thread(target=lambda s=self: s.__compile_and_run()).start()
else:
self.__compile_and_run()
@fc.Command(description="Kill")
def kill(self):
try:
self.tnmr().get_instance().Abort
self.status = ('IDLE', 'ok - killed')
except:
pass
### READ/WRITE
@@ -183,6 +198,24 @@ class ProgrammedSequence(fc.Readable):
self.status = ('IDLE', 'ok - uncompiled')
return self.read_acq_phase_cycle()
def read_num_scans(self):
return self.tnmr().get_nmrparameter('Scans 1D')
def write_num_scans(self, t):
if(self.status[0] != 'BUSY'):
self.tnmr().set_nmrparameter('Scans 1D', t)
self.status = ('IDLE', 'ok - uncompiled')
return self.read_num_scans()
def read_obs_freq(self):
return self.tnmr().get_nmrparameter('Observe Freq.')
def write_obs_freq(self, t):
if(self.status[0] != 'BUSY'):
self.tnmr().set_nmrparameter('Observe Freq.', t)
self.status = ('IDLE', 'ok - uncompiled')
return self.read_obs_freq()
### PRIVATE (Utility)
def __compile_sequence(self):
if(self.status[0] != 'BUSY'):
@@ -209,9 +242,17 @@ class ProgrammedSequence(fc.Readable):
seq_gen.save_sequence(filename, seq)
seq_gen.save_sequence_cfg(filename, seq)
dashboard_params = { 'Observe Freq.': self.read_obs_freq(),
'Scans 1D': self.read_num_scans(),
}
# then, load the thing into TNMR
self.tnmr().load_sequence(filename)
# load some parameters back to TNMR
for key, val in dashboard_params.items():
self.tnmr().set_nmrparameter(key, val)
# finally, let ourselves know we're ready
self.status = ('PREPARED', 'compiled')
@@ -227,22 +268,9 @@ class ProgrammedSequence(fc.Readable):
self.status = ('PREPARED', 'compiled')
def __compile_and_run(self, thread=True, recurse=True):
pythoncom.CoInitialize()
try:
self.__compile_sequence()
self.__zero_go()
except AttributeError as e:
print(f'Attribute error on compile and run.{" Resetting the COM interface and retrying..." if recurse else " Resetting did not fix this problem!"}')
self.status = ('IDLE', 'ok - uncompiled')
self.tnmr().reset_NTNMR_instance()
self.__compile_and_run(thread, recurse=False)
except Exception as e:
print('Failed to compile and run!')
print(str(e))
print(repr(e))
self.status = ('IDLE', 'ok - uncompiled')
if(thread):
pythoncom.CoUninitialize()
self.tnmr().reset_NTNMR_instance()
self.__compile_sequence()
self.__zero_go()

View File

@@ -31,9 +31,18 @@ def get_single_pulse_block(name, pulse_width, pulse_height, relaxation_time, pha
a dictionary which can be updated with others to generate a larger, more complex sequence.
'''
if(relaxation_time.strip()[-1] == 'u'):
relax_time = float(relaxation_time.strip()[:-1])
elif(relaxation_time.strip()[-1] == 'n'):
relax_time = float(relaxation_time.strip()[:-1]) * 1000
if(relaxation_time.strip()[-1] == 'm'):
relax_time = float(relaxation_time.strip()[:-1]) / 1e3
if(relaxation_time.strip()[-1] == 's'):
relax_time = float(relaxation_time.strip()[:-1]) / 1e6
ph = name + '_phase'
rl = name + '_relaxation'
block = se.generate_default_sequence([ ph, rl ], [ pulse_width, relaxation_time ])
block = se.generate_default_sequence([ ph, rl ] if relax_time > 0 else [ ph ], [ pulse_width, relaxation_time ] if relax_time > 0 else [pulse_width])
# COLUMNNS
# PH column
@@ -41,9 +50,11 @@ def get_single_pulse_block(name, pulse_width, pulse_height, relaxation_time, pha
block['columns'][ph]['Delay'] = str(pulse_width)
block['columns'][ph]['F1_UnBlank']['value'] = '1'
block['columns'][ph]['Rx_Blank']['value'] = '1'
# relaxation column
block['columns'][rl]['F1_UnBlank']['value'] = '1'
block['columns'][rl]['Rx_Blank']['value'] = '1'
if(relax_time > 0):
# relaxation column
block['columns'][rl]['F1_UnBlank']['value'] = '1'
block['columns'][rl]['Rx_Blank']['value'] = '1'
if(phase_cycle != ''):
table_name = f'ph_{name}'

View File

@@ -0,0 +1,5 @@
Acquisition = Nucleus, Observe Freq., Acq. Points, Points 1D, SW +/-, Filter, Dwell Time, Acq. Time, Last Delay, ::, Scans 1D, Actual Scans 1D, Scan Start 1D, Repeat Times, S.A. Dimension, Dummy Scans, Receiver Gain, ::, Points 2D, Actual Points 2D, Points Start 2D, Points 3D, Actual Points 3D, Points Start 3D, Points 4D, Actual Points 4D, Points Start 4D, ::, SW 2D, SW 3D, SW 4D, Dwell_2D, Dwell_3D, Dwell_4D
Frequency = Observe Freq., Observe Ch., ::, F1 Freq., F2 Freq.
Processing = Shift # Points, LB 1D, GB 1D, DM 1D, SB Shift 1D, SB Width 1D, SB Skew 1D, TZ 1 1D, TZ 2 1D, TZ 3 1D, TZ 4 1D, Traf 1D, Sys. Phase 0 1D, Sys. Phase 1 1D, Phase 0 1D, Phase 1 1D, Echo Center 1D, ::, LB 2D, GB 2D, DM 2D, SB Shift 2D, SB Width 2D, SB Skew 2D, TZ 1 2D, TZ 2 2D, TZ 3 2D, TZ 4 2D, Traf 2D, Sys. Phase 0 2D, Sys. Phase 1 2D, Phase 0 2D, Phase 1 2D, Echo Center 2D, ::, LB 3D, GB 3D, DM 3D, SB Shift 3D, SB Width 3D, SB Skew 3D, TZ 1 3D, TZ 2 3D, TZ 3 3D, TZ 4 3D, Traf 3D, Sys. Phase 0 3D, Sys. Phase 1 3D, Phase 0 3D, Phase 1 3D, Echo Center 3D, ::, LB 4D, GB 4D, DM 4D, SB Shift 4D, SB Width 4D, SB Skew 4D, TZ 1 4D, TZ 2 4D, TZ 3 4D, TZ 4 4D, Traf 4D, Sys. Phase 0 4D, Sys. Phase 1 4D, Phase 0 4D, Phase 1 4D, Echo Center 4D
Misc. = Date, Magnet Field, Absolute Freq., Exp. Start Time, Exp. Finish Time, Exp. Elapsed Time

View File

@@ -57,8 +57,8 @@ class TNMR:
filepath: specifies a path to the file tnt you want to use
"""
#first we check if an instance of TNMR is running an get it or create it
print('Opening TNMR connection')
if(NTNMR_inst is None):
print('Opening new TNMR connection')
self.reset_NTNMR_instance()
else:
self.NTNMR = NTNMR_inst
@@ -72,10 +72,18 @@ class TNMR:
def reset_NTNMR_instance(self):
try:
pythoncom.CoInitialize()
self.NTNMR = win32com.client.GetActiveObject("NTNMR.Application")
except pythoncom.com_error:
raise TNMRNotRunnningError
def get_instance(self):
try:
pythoncom.CoInitialize()
return win32com.client.GetActiveObject("NTNMR.Application")
except pythoncom.com_error:
raise TNMRNotRunnningError
def execute_cmd(self, cmd):
print('W: Executing arbitrary command: ' + f'out = self.NTNMR.{cmd}')
out = 0
@@ -119,14 +127,15 @@ class TNMR:
# for some reason CheckAcquisition is False while an experiment is
# running but true otherwise
print('Zero-going...')
if self.NTNMR.CheckAcquisition == True:
self.NTNMR.ZG
ntnmr = self.get_instance()
if not(self.acquisition_running()):
ntnmr.ZG
else:
print('An Acquisition is already running')
if lock:
print("Application locked during acquisition\n...waiting...")
while self.NTNMR.CheckAcquisition == False:
while self.acquisition_running():
time.sleep(interval)
print("Acquisition done")
@@ -138,10 +147,16 @@ class TNMR:
True: if running
False: if not running
"""
return not(self.NTNMR.CheckAcquisition)
#try:
ntnmr = self.get_instance()
res = not(ntnmr.CheckAcquisition)
#except AttributeError as e:
# if(e
# res = False
return res
def get_data(self):
raw_data = self.NTNMR.GetData
raw_data = self.get_instance().GetData
reals = raw_data[::2]
imags = raw_data[1::2]
@@ -170,9 +185,9 @@ class TNMR:
"""
print('I: Saving')
if filepath == '':
self.NTNMR.Save
self.get_instance().Save
else:
self.NTNMR.SaveAs(filepath)
self.get_instance().SaveAs(filepath)
print(f'I: Saved to file {filepath}')
def set_nmrparameter(self, param_name: str, value: str):
@@ -189,7 +204,7 @@ class TNMR:
False: otherwise.
"""
if(self.is_nmrparameter(param_name)):
self.NTNMR.SetNMRParameter(param_name, value)
self.get_instance().SetNMRParameter(param_name, value)
print(f'I: Setting parameter {param_name} to value of {value}')
return True
print(f'W: Failed to set parameter {param_name} to {value}')
@@ -208,10 +223,11 @@ class TNMR:
None: Else
"""
try:
return self.NTNMR.GetNMRParameter(param_name)
except:
return self.get_instance().GetNMRParameter(param_name)
except Exception as e:
print(str(e), repr(e))
print('not a param. try one of:', self.get_page_parameters('Sequence'))
return None
return NoneW
def is_nmrparameter(self, param_name: str):
"""Checks that a given parameter actually exists in the setup.
@@ -226,8 +242,11 @@ class TNMR:
False: otherwise.
"""
try:
self.NTNMR.GetNMRParameter(param_name)
self.get_instance().GetNMRParameter(param_name)
return True
except AttributeError:
self.reset_NTNMR_instance()
return self.is_nmrparameter(param_name)
except:
return False
@@ -239,7 +258,7 @@ class TNMR:
A dictionary of all parameters, in form { [page name]: { [parameter name]: [parameter value], ... }, ... }
"""
full_dict = {}
pages = self.NTNMR.GetParameterPageList.split(",")
pages = self.get_instance().GetParameterPageList.split(",")
for p in pages:
p = p.strip()
sub_dict = self.get_page_parameters(p)
@@ -255,7 +274,7 @@ class TNMR:
a dictionary of the sequence parameters.
"""
sub_dict = { }
params_raw = self.NTNMR.GetParameterListInPage(page)
params_raw = self.get_instance().GetParameterListInPage(page)
params = params_raw[params_raw.find('=')+1:].split(",")
for param in params:
param_stripped = param.strip()
@@ -288,25 +307,28 @@ class TNMR:
False: if otherwise. (TODO: Exceptions-based rather than this)
"""
ntnmr = self.get_instance()
print(f'Loading sequence at {filename}')
self.NTNMR.CloseActiveFile
success = self.NTNMR.OpenFile(TEMPLATE_FILE_PATH + 'tmp.tnt')
ntnmr.CloseActiveFile
success = ntnmr.OpenFile(TEMPLATE_FILE_PATH + 'tmp.tnt')
if(success):
print('Template file reloaded')
else:
print(f'Failed to load template file. Please ensure that there exists an empty .tnt file named {TEMPLATE_FILE_PATH}/tmp.tnt (Close, New, Save As...)')
return False
self.set_activefile()
self.load_dashboard(TEMPLATE_FILE_PATH + 'scout_dashboard.txt')
success = self.NTNMR.LoadSequence(filename if filename[-4:]=='.tps' else (filename+'.tps'))
success = ntnmr.LoadSequence(filename if filename[-4:]=='.tps' else (filename+'.tps'))
if(success):
print(f'Successfully loaded sequence')
else:
print('Failed to load sequence')
return False
self.NTNMR.SaveAs(TEMPLATE_FILE_PATH + 'tmper.tnt') # even more temporary
success = self.NTNMR.OpenFile(TEMPLATE_FILE_PATH + 'tmper.tnt') # reload the file so that we can actually read/write to the Sequence parameters (TNMR bug)
ntnmr.SaveAs(TEMPLATE_FILE_PATH + 'tmper.tnt') # even more temporary
success = ntnmr.OpenFile(TEMPLATE_FILE_PATH + 'tmper.tnt') # reload the file so that we can actually read/write to the Sequence parameters (TNMR bug)
self.set_activefile()
if(success):
@@ -321,7 +343,7 @@ class TNMR:
def load_dashboard(self, dashboard_fn):
print(f'I: Loading dashboard setup from {dashboard_fn}')
success = self.NTNMR.LoadParameterSetupFromFile(dashboard_fn)
success = self.get_instance().LoadParameterSetupFromFile(dashboard_fn)
if(success):
print(f'I: Successfully loaded dashboard')
else: