A few things: 1. Got it working again; 2. Renamed files to make more sense; 3. Replaced template tmp.tnt with an emptied out file that previously took data, now data is collected correctly (bug, I'm not sure where this need comes from but this is, as far as I know, a permanent workaround); 4. Added automatic COM interface restart on errors compiling; 5. Implemented variable acquisition times.
This commit is contained in:
336
frappy_psi/tnmr/tnmr_interface.py
Normal file
336
frappy_psi/tnmr/tnmr_interface.py
Normal file
@@ -0,0 +1,336 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
TNMR_DG_Extension
|
||||
______________________
|
||||
|
||||
Version: 1.0
|
||||
Authors: Davis Garrad (Paul Scherrer Institute, CH)
|
||||
______________________
|
||||
|
||||
Wrapper for communication with TecMag TNMR software, specifically in the context of the NMR setup in Gediminas Simutis' Lab at PSI, CH.
|
||||
"""
|
||||
import os
|
||||
|
||||
TEMPLATE_FILE_PATH = os.path.dirname(os.path.realpath(__file__)) + '/templates/' # TODO: make prettier
|
||||
|
||||
import win32com.client
|
||||
import pythoncom
|
||||
import time
|
||||
import json
|
||||
|
||||
class TNMRNotRunnningError(Exception):
|
||||
def __init__(self, msg=None):
|
||||
if msg is None:
|
||||
msg = "No instance of TNMR running. Start TNMR and try again."
|
||||
super(TNMRNotRunnningError, self).__init__(msg)
|
||||
|
||||
class TNMR:
|
||||
""" This class allows to communicate with Tecmag TNMR software for easier acquisition
|
||||
control.
|
||||
|
||||
Instance Attributes
|
||||
-------------------
|
||||
NMTNR: win32com object
|
||||
ACTIVEFILE: str
|
||||
path to active TNMR file
|
||||
|
||||
Methods
|
||||
-------
|
||||
openfile(filepath: str, active: bool):
|
||||
opens the tnt file specified by filepath
|
||||
if active is true, the newly opened file will be set to ACTIVEFILE
|
||||
set_activefile():
|
||||
set ACTIVEFILE to current TNMR active doc path
|
||||
ZeroGo(lock: bool, interval: float):
|
||||
if possible, starts an experiment
|
||||
if lock is true, the program will be blocked from further interaction until
|
||||
the acqusition endswith
|
||||
acquisition_running():
|
||||
returns the acquisition status of TNMR
|
||||
"""
|
||||
def __init__(self, filepath = "", NTNMR_inst=None):
|
||||
""" Creates an instance of the NTNMR class, which is used to communicate with TNMR,
|
||||
Tecmags control software. Basically a wrapper for TNMRs api
|
||||
|
||||
Parameters
|
||||
----------
|
||||
filepath: specifies a path to the file tnt you want to use
|
||||
"""
|
||||
#first we check if an instance of TNMR is running an get it or create it
|
||||
print('Opening TNMR connection')
|
||||
if(NTNMR_inst is None):
|
||||
self.reset_NTNMR_instance()
|
||||
else:
|
||||
self.NTNMR = NTNMR_inst
|
||||
# next we open a specified file. If none is specified, then we use the active file
|
||||
if filepath != "":
|
||||
print(f'Loading file {filepath}')
|
||||
self.NTNMR.OpenFile(filepath)
|
||||
self.ACTIVEFILE = self.NTNMR.GetActiveDocPath
|
||||
self.ACTIVEPATH = os.path.dirname(self.ACTIVEFILE)
|
||||
print(f'Active file: {self.ACTIVEFILE} in path {self.ACTIVEPATH}')
|
||||
|
||||
def reset_NTNMR_instance(self):
|
||||
try:
|
||||
self.NTNMR = win32com.client.GetActiveObject("NTNMR.Application")
|
||||
except pythoncom.com_error:
|
||||
raise TNMRNotRunnningError
|
||||
|
||||
def execute_cmd(self, cmd):
|
||||
print('W: Executing arbitrary command: ' + f'out = self.NTNMR.{cmd}')
|
||||
out = 0
|
||||
exec(f'out = self.NTNMR.{cmd}\nprint("W: OUTPUT: " + str(out))')
|
||||
return out
|
||||
|
||||
def openfile(self, filepath, active = True):
|
||||
""" Opens a new file. Per default, the new file will be selected as active.
|
||||
Set active = False to prevent this.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
filepath: str
|
||||
path to tnt file. Make sure to pass a raw string (i.e. backslashes are
|
||||
escaped)
|
||||
active: bool
|
||||
"""
|
||||
print(f'Opening file {filepath}')
|
||||
self.NTNMR.OpenFile(filepath)
|
||||
if active:
|
||||
self.ACTIVEFILE = self.NTNMR.GetActiveDocPath
|
||||
print(f'Active file: {self.ACTIVEFILE} in path {self.ACTIVEPATH}')
|
||||
|
||||
def set_activefile(self):
|
||||
""" Sets TNMR active doc path to ACTIVEFILE
|
||||
"""
|
||||
self.ACTIVEFILE = self.NTNMR.GetActiveDocPath
|
||||
self.ACTIVEPATH = os.path.dirname(self.ACTIVEFILE)
|
||||
print(f'Active file: {self.ACTIVEFILE} in path {self.ACTIVEPATH}')
|
||||
|
||||
def ZeroGo(self, lock = True, interval = 0.5):
|
||||
""" If possible, zeros and starts acquisition
|
||||
|
||||
Parameters
|
||||
----------
|
||||
lock: bool
|
||||
if true, program waits until acquisition is done
|
||||
interval: float
|
||||
how often to check if acquisition done
|
||||
"""
|
||||
# for some reason CheckAcquisition is False while an experiment is
|
||||
# running but true otherwise
|
||||
print('Zero-going...')
|
||||
if self.NTNMR.CheckAcquisition == True:
|
||||
self.NTNMR.ZG
|
||||
else:
|
||||
print('An Acquisition is already running')
|
||||
|
||||
if lock:
|
||||
print("Application locked during acquisition\n...waiting...")
|
||||
while self.NTNMR.CheckAcquisition == False:
|
||||
time.sleep(interval)
|
||||
print("Acquisition done")
|
||||
|
||||
def acquisition_running(self):
|
||||
""" Checks if acquisition is running
|
||||
|
||||
Returns
|
||||
-------
|
||||
True: if running
|
||||
False: if not running
|
||||
"""
|
||||
return not(self.NTNMR.CheckAcquisition)
|
||||
|
||||
def get_data(self):
|
||||
raw_data = self.NTNMR.GetData
|
||||
reals = raw_data[::2]
|
||||
imags = raw_data[1::2]
|
||||
|
||||
return (reals, imags)
|
||||
|
||||
def get_data_times(self):
|
||||
#acq_n = int(self.NTNMR.GetNMRParameter('Acq. Points')) # TODO: These do NOT return the actual used values!
|
||||
#acq_t = self.NTNMR.GetNMRParameter('Acq. Time')
|
||||
#acq_t = acq_t.strip()
|
||||
#if(acq_t[-1] == 'm'):
|
||||
# acq_t = float(acq_t[:-1]) * 1000
|
||||
#elif(acq_t[-1] == 'u'):
|
||||
# acq_t = float(acq_t[:-1])
|
||||
#elif(acq_t[-1] == 'n'):
|
||||
# acq_t = float(acq_t[:-1]) / 1000
|
||||
acq_t = 204.8 # us
|
||||
acq_n = 1024
|
||||
return [ i * (acq_t / (acq_n - 1)) for i in range(0, acq_n+1) ]
|
||||
|
||||
def save_file(self, filepath=''):
|
||||
""" Save file to filepath. if no filepath specified, save current active file
|
||||
|
||||
Parameters
|
||||
----------
|
||||
filepath: str
|
||||
"""
|
||||
print('I: Saving')
|
||||
if filepath == '':
|
||||
self.NTNMR.Save
|
||||
else:
|
||||
self.NTNMR.SaveAs(filepath)
|
||||
print(f'I: Saved to file {filepath}')
|
||||
|
||||
def set_nmrparameter(self, param_name: str, value: str):
|
||||
"""Sets the value of an NMR parameter by name.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
param_name: str
|
||||
value: str
|
||||
|
||||
Returns
|
||||
-------
|
||||
True: if successful
|
||||
False: otherwise.
|
||||
"""
|
||||
if(self.is_nmrparameter(param_name)):
|
||||
self.NTNMR.SetNMRParameter(param_name, value)
|
||||
print(f'I: Setting parameter {param_name} to value of {value}')
|
||||
return True
|
||||
print(f'W: Failed to set parameter {param_name} to {value}')
|
||||
return False
|
||||
|
||||
def get_nmrparameter(self, param_name: str):
|
||||
"""Returns the value of an NMR parameter by name.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
param_name: str
|
||||
|
||||
Returns
|
||||
-------
|
||||
The value of the parameter: if found
|
||||
None: Else
|
||||
"""
|
||||
try:
|
||||
return self.NTNMR.GetNMRParameter(param_name)
|
||||
except:
|
||||
print('not a param. try one of:', self.get_page_parameters('Sequence'))
|
||||
return None
|
||||
|
||||
def is_nmrparameter(self, param_name: str):
|
||||
"""Checks that a given parameter actually exists in the setup.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
param_name: str
|
||||
|
||||
Returns
|
||||
-------
|
||||
True: if the parameter exists
|
||||
False: otherwise.
|
||||
"""
|
||||
try:
|
||||
self.NTNMR.GetNMRParameter(param_name)
|
||||
return True
|
||||
except:
|
||||
return False
|
||||
|
||||
def get_all_nmrparameters(self):
|
||||
"""Gets all parameter names and values from all pages of the NMR parameters.
|
||||
|
||||
Returns
|
||||
-------
|
||||
A dictionary of all parameters, in form { [page name]: { [parameter name]: [parameter value], ... }, ... }
|
||||
"""
|
||||
full_dict = {}
|
||||
pages = self.NTNMR.GetParameterPageList.split(",")
|
||||
for p in pages:
|
||||
p = p.strip()
|
||||
sub_dict = self.get_page_parameters(p)
|
||||
full_dict[p] = sub_dict
|
||||
|
||||
return full_dict
|
||||
|
||||
def get_page_parameters(self, page):
|
||||
"""Gets the parameters used for the sequence.
|
||||
|
||||
Returns
|
||||
-------
|
||||
a dictionary of the sequence parameters.
|
||||
"""
|
||||
sub_dict = { }
|
||||
params_raw = self.NTNMR.GetParameterListInPage(page)
|
||||
params = params_raw[params_raw.find('=')+1:].split(",")
|
||||
for param in params:
|
||||
param_stripped = param.strip()
|
||||
val = self.get_nmrparameter(param_stripped)
|
||||
if not(val is None):
|
||||
sub_dict[param_stripped] = str(val)
|
||||
else:
|
||||
print(param_stripped)
|
||||
|
||||
return sub_dict
|
||||
|
||||
def load_sequence(self, filename):
|
||||
"""WARNING: POSSIBLY DESTRUCTIVE TO DATA
|
||||
Reads a sequence file and (hopefully) updates the dashboard on the Sequence page. Because of various problems with the TNMR API, this function does the following:
|
||||
1. Closes the currently active file. ENSURE YOUR DATA IS SAVED BEFORE USING THIS FUNCTION. It will NOT verify that everything is up to date, nor block. Be wise.
|
||||
2. Opens a template file, located at a predefined location, defined in this file.
|
||||
3. Loads the given sequence into this template file (tmp.tnt)
|
||||
4. Saves this as a new template file (tmper.tnt)
|
||||
5. Closes and reloads the new template file, to make the sequence parameters visible.
|
||||
|
||||
There is incredible ''bodging'' (as Tom calls it) in this code, and it is entirely Tecmag's fault, as their API simply doesn't do what it says it does.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
filename: str
|
||||
|
||||
Returns
|
||||
-------
|
||||
True: if successful
|
||||
False: if otherwise. (TODO: Exceptions-based rather than this)
|
||||
"""
|
||||
|
||||
print(f'Loading sequence at {filename}')
|
||||
self.NTNMR.CloseActiveFile
|
||||
success = self.NTNMR.OpenFile(TEMPLATE_FILE_PATH + 'tmp.tnt')
|
||||
if(success):
|
||||
print('Template file reloaded')
|
||||
else:
|
||||
print(f'Failed to load template file. Please ensure that there exists an empty .tnt file named {TEMPLATE_FILE_PATH}/tmp.tnt (Close, New, Save As...)')
|
||||
return False
|
||||
self.set_activefile()
|
||||
|
||||
success = self.NTNMR.LoadSequence(filename if filename[-4:]=='.tps' else (filename+'.tps'))
|
||||
if(success):
|
||||
print(f'Successfully loaded sequence')
|
||||
else:
|
||||
print('Failed to load sequence')
|
||||
return False
|
||||
|
||||
self.NTNMR.SaveAs(TEMPLATE_FILE_PATH + 'tmper.tnt') # even more temporary
|
||||
success = self.NTNMR.OpenFile(TEMPLATE_FILE_PATH + 'tmper.tnt') # reload the file so that we can actually read/write to the Sequence parameters (TNMR bug)
|
||||
self.set_activefile()
|
||||
|
||||
if(success):
|
||||
print(f'Successfully reloaded')
|
||||
else:
|
||||
print('Failed to reload')
|
||||
return False
|
||||
print(f'I: Successfully loaded sequence from {filename}')
|
||||
|
||||
return True
|
||||
|
||||
def load_dashboard(self, dashboard_fn):
|
||||
print(f'I: Loading dashboard setup from {dashboard_fn}')
|
||||
|
||||
success = self.NTNMR.LoadParameterSetupFromFile(dashboard_fn)
|
||||
if(success):
|
||||
print(f'I: Successfully loaded dashboard')
|
||||
else:
|
||||
print(f'W: Failed to load dashboard')
|
||||
return success
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user