Files
frappy/frappy_psi/tnmr/tnmr_interface.py

465 lines
18 KiB
Python

# -*- coding: utf-8 -*-
"""
tnmr_interface
______________________
Version: 1.0
Authors: Davis Garrad (Paul Scherrer Institute, CH)
______________________
Wrapper for communication with TecMag TNMR software, specifically in the context of the NMR setup in Gediminas Simutis' Lab at PSI, CH.
"""
import os
TEMPLATE_FILE_PATH = os.path.dirname(os.path.realpath(__file__)) + '/templates/' # TODO: make prettier
import win32com.client
import pythoncom
import frappy_psi.tnmr.NTNMR as NTNMR
import winapps
import time
import json
import traceback
import threading
class TNMRNotRunnningError(Exception):
def __init__(self, msg=None):
if msg is None:
msg = "No instance of TNMR running. Start TNMR and try again."
super(TNMRNotRunnningError, self).__init__(msg)
class TNMR:
""" This class allows to communicate with Tecmag TNMR software for easier acquisition
control.
Instance Attributes
-------------------
ACTIVEFILE: str
path to active TNMR file
ACTIVEPATH: str
path to active TNMR file's parent directory.
Methods
-------
get_instance():
Gets an instance of the NTNMR "API". Best to use this rather than keep an object, as threads can mess with the usefulness of an object.
execute_cmd(cmd):
Sends an arbitrary command to the TNMR software. Best for debugging.
openfile(filepath: str, active: bool):
opens the tnt file specified by filepath
if active is true, the newly opened file will be set to ACTIVEFILE
set_activefile():
set ACTIVEFILE to current TNMR active doc path
ZeroGo(lock: bool, interval: float):
if possible, starts an experiment
if lock is true, the program will be blocked from further interaction until
the acqusition endswith
acquisition_running():
returns the acquisition status of TNMR
get_data():
Pulls the currently loaded data from TNMR and returns it.
save_file(filepath=''):
Saves the experiment to a file
set_nmrparameter(param_name: str, value: str):
If it exists, sets an NMR parameter
get_nmrparameter(param_name: str):
If it exists, returns the value of an NMR parameter
is_nmrparameter(param_name: str):
Checks if an NMR parameter exists.
get_all_nmrparameters():
Returns all possible NMR parameters in the dashboard.
get_page_parameters(page):
Returns all possible NMR parameters on a page of the dashboard.
load_sequence(filename):
WARNING: POSSIBLY DESTRUCTIVE. ENSURE DATA IS SAVED BEFORE CALLING.
Loads a sequence (and reloads the template dashboard) into the TNMR software.
load_dashboard(dashboard_fn):
Loads a dashboard (resetting parameters) into TNMR.
"""
def __init__(self, filepath = ""):
""" Creates an instance of the NTNMR class, which is used to communicate with TNMR,
Tecmags control software. Basically a wrapper for TNMRs api
Parameters
----------
filepath: specifies a path to the file tnt you want to use
"""
#first we check if an instance of TNMR is running an get it or create it
print('Opening new TNMR connection')
ntnmr = self.get_instance()
# next we open a specified file. If none is specified, then we use the active file
if filepath != "":
print(f'Loading file {filepath}')
ntnmr.OpenFile(filepath)
self.ACTIVEFILE = ntnmr.GetActiveDocPath()
self.ACTIVEPATH = os.path.dirname(self.ACTIVEFILE)
print(f'Active file: {self.ACTIVEFILE} in path {self.ACTIVEPATH}')
def get_instance(self):
'''Tries to open up a Windows COM connection to the TNMR program, and returns an instance if able'''
try:
pythoncom.CoInitialize()
return win32com.client.GetActiveObject("NTNMR.Application")
except pythoncom.com_error:
try:
self.open_TNMR()
time.sleep(10)
# try again
pythoncom.CoInitialize()
return win32com.client.GetActiveObject("NTNMR.Application")
except:
raise TNMRNotRunnningError
raise TNMRNotRunnningError
def open_TNMR(self):
'''Tries to open up TNMR, using its registration in the Windows registry'''
for app in winapps.search_installed('TNMR'):
os.system('start ' + str(app.install_location.absolute()) + '\\bin\\TNMR.exe')
def execute_cmd(self, cmd):
'''Sends an arbitrary command through to TNMR'''
print('W: Executing arbitrary command: ' + f'out = self.get_instance().{cmd}')
out = 0
exec(f'out = self.get_instance().{cmd}\nprint("W: OUTPUT: " + str(out))')
return out
def openfile(self, filepath, active = True):
""" Opens a new file. Per default, the new file will be selected as active.
Set active = False to prevent this.
Parameters
----------
filepath: str
path to tnt file. Make sure to pass a raw string (i.e. backslashes are
escaped)
active: bool
"""
print(f'Opening file {filepath}')
ntnmr = self.get_instance()
ntnmr.OpenFile(filepath)
if active:
self.ACTIVEFILE = ntnmr.GetActiveDocPath()
print(f'Active file: {self.ACTIVEFILE} in path {self.ACTIVEPATH}')
def set_activefile(self):
""" Sets TNMR active doc path to ACTIVEFILE
"""
self.ACTIVEFILE = self.get_instance().GetActiveDocPath()
self.ACTIVEPATH = os.path.dirname(self.ACTIVEFILE)
print(f'Active file: {self.ACTIVEFILE} in path {self.ACTIVEPATH}')
def ZeroGo(self, lock = True, interval = 0.5, check_time=10):
""" If possible, zeros and starts acquisition
Parameters
----------
lock: bool
if true, program waits until acquisition is done
interval: float
how often to check if acquisition done
check_time: float
how many seconds until not recieving new data is considered grounds for another Zero-Go attempt. Recommended to set as at least the length of 2-3 pulse sequences.
"""
# for some reason CheckAcquisition is False while an experiment is
# running but true otherwise
CHECK_MODE = 'thread' # thread OR data
print('Zero-going...')
ntnmr = self.get_instance()
if not(self.acquisition_running()):
#print('Reset')
#ntnmr.Reset() # to avoid hardware issues? EDIT: Doesn't seem to do much...
if(CHECK_MODE == 'data'):
print('Artificially setting the zeroth point to NULL for error detection.')
ntnmr.SetDataPoint(1, [0,0])
print('ZG')
try:
def t(s):
print('\nStart ZG lambda')
try:
s.get_instance().ZeroAndGo()
except:
print('\nException in ZG lambda')
pass
print('\nCompletion of ZG lambda')
return
thread = threading.Thread(target=t, args=(self,))
thread.start()
except:
traceback.print_exc()
print('ZG completed')
else:
print('An acquisition is already running')
return
if(CHECK_MODE == 'data'):
elapsed = 0
print('Waiting to recieve real data')
while(True):
try:
d = ntnmr.GetData()
if not(d is None):
if(d[0] != 0):
break
except:
traceback.print_exc()
time.sleep(0.1)
elapsed += 0.1
print(f'\rElapsed: {elapsed:.1f}s/{check_time:.1f}s', end='')
if(elapsed > check_time): # broken
print('\nTimeout! No data!')
ntnmr.Abort()
self.ZeroGo(lock=lock, interval=interval, check_time=check_time)
break
print('\n')
elif(CHECK_MODE == 'thread'):
print('Giving ZeroGo command a grace period to terminate...')
elapsed = 0.0
while(elapsed < 2.0):
if not(thread.is_alive()):
print('\nZeroGo terminated in time. Continuing...', end='')
break
time.sleep(0.1)
elapsed += 0.1
print(f'\rElapsed: {elapsed:.1f}s/2.0s', end='')
print('\n')
if(thread.is_alive()): # technically possible that it dies at the verrrry last moment, so may as well add an if-condition. What can I say? I'm merciful.
print('ZeroGo did not terminate. This is a sign of an error. Retrying...')
# the thread still hasn't died - this is a sign that the ZeroGo got caught up with some sort of error. Abandon, and retry.
ntnmr.Abort()
self.ZeroGo(lock=lock, interval=interval, check_time=check_time)
if lock:
print("Application locked during acquisition. Waiting...")
while self.acquisition_running():
time.sleep(interval)
print("Acquisition done")
def acquisition_running(self):
""" Checks if acquisition is running
Returns
-------
True: if running
False: if not running
"""
ntnmr = self.get_instance()
res = not(ntnmr.CheckAcquisition())
return res
def get_data(self):
'''Pulls data from TNMR
Returns
-------
a tuple of ([real_array], [imaginary_array])
'''
raw_data = self.get_instance().GetData()
reals = raw_data[::2]
imags = raw_data[1::2]
return (reals, imags)
def save_file(self, filepath=''):
""" Save file to filepath. if no filepath specified, save current active file
Parameters
----------
filepath: str
"""
print('I: Saving')
if filepath == '':
self.get_instance().Save()
else:
self.get_instance().SaveAs(filepath)
print(f'I: Saved to file {filepath}')
def set_nmrparameter(self, param_name: str, value: str):
"""Sets the value of an NMR parameter by name.
Parameters
----------
param_name: str
value: str
Returns
-------
True: if successful
False: otherwise.
"""
if(self.is_nmrparameter(param_name)):
self.get_instance().SetNMRParameter(param_name, value)
print(f'I: Setting parameter {param_name} to value of {value}')
return True
print(f'W: Failed to set parameter {param_name} to {value}')
return False
def get_nmrparameter(self, param_name: str):
"""Returns the value of an NMR parameter by name.
Parameters
----------
param_name: str
Returns
-------
The value of the parameter: if found
None: Else
"""
try:
return self.get_instance().GetNMRParameter(param_name)
except Exception as e:
print(str(e), repr(e))
print('not a param. try one of:', self.get_page_parameters('Sequence'))
return NoneW
def is_nmrparameter(self, param_name: str):
"""Checks that a given parameter actually exists in the setup.
Parameters
----------
param_name: str
Returns
-------
True: if the parameter exists
False: otherwise.
"""
try:
self.get_instance().GetNMRParameter(param_name)
return True
except:
return False
def get_all_nmrparameters(self):
"""Gets all parameter names and values from all pages of the NMR parameters.
Returns
-------
A dictionary of all parameters, in form { [page name]: { [parameter name]: [parameter value], ... }, ... }
"""
full_dict = {}
pages = self.get_instance().GetParameterPageList.split(",")
for p in pages:
p = p.strip()
sub_dict = self.get_page_parameters(p)
full_dict[p] = sub_dict
return full_dict
def get_page_parameters(self, page):
"""Gets the parameters used for the sequence.
Returns
-------
a dictionary of the sequence parameters.
"""
sub_dict = { }
params_raw = self.get_instance().GetParameterListInPage(page)
params = params_raw[params_raw.find('=')+1:].split(",")
for param in params:
param_stripped = param.strip()
val = self.get_nmrparameter(param_stripped)
if not(val is None):
sub_dict[param_stripped] = str(val)
else:
print(param_stripped)
return sub_dict
def load_sequence(self, filename):
"""WARNING: POSSIBLY DESTRUCTIVE TO DATA
Reads a sequence file and (hopefully) updates the dashboard on the Sequence page. Because of various problems with the TNMR API, this function does the following:
1. Closes the currently active file. ENSURE YOUR DATA IS SAVED BEFORE USING THIS FUNCTION. It will NOT verify that everything is up to date, nor block. Be wise.
2. Opens a template file, located at a predefined location, defined in this file.
3. Loads the given sequence into this template file (tmp.tnt)
4. Saves this as a new template file (tmper.tnt)
5. Closes and reloads the new template file, to make the sequence parameters visible.
There is incredible ''bodging'' (as Tom calls it) in this code, and it is entirely Tecmag's fault, as their API simply doesn't do what it says it does.
Parameters
----------
filename: str
Returns
-------
True: if successful
False: if otherwise. (TODO: Exceptions-based rather than this)
"""
ntnmr = self.get_instance()
if (self.acquisition_running()):
ntnmr.Abort()
print('W: Aborting currently running acquisition!')
print(f'Loading sequence at {filename}')
ntnmr.CloseActiveFile()
success = ntnmr.OpenFile(TEMPLATE_FILE_PATH + 'tmp.tnt')
if(success):
print('Template file reloaded')
else:
print(f'Failed to load template file. Please ensure that there exists an empty .tnt file named {TEMPLATE_FILE_PATH}/tmp.tnt (Close, New, Save As...)')
return False
self.set_activefile()
self.load_dashboard(TEMPLATE_FILE_PATH + 'dashboard.txt')
success = ntnmr.LoadSequence(filename if filename[-4:]=='.tps' else (filename+'.tps'))
if(success):
print(f'Successfully loaded sequence')
else:
print('Failed to load sequence')
return False
ntnmr.SaveAs(TEMPLATE_FILE_PATH + 'tmper.tnt') # even more temporary
success = ntnmr.OpenFile(TEMPLATE_FILE_PATH + 'tmper.tnt') # reload the file so that we can actually read/write to the Sequence parameters (TNMR bug)
self.set_activefile()
success_overall = ntnmr.GetSequenceName() == (filename[:-4] if filename[-4:]=='.tps' else filename).split('/')[-1]
if(success):
print(f'Successfully reloaded')
else:
print('Failed to reload')
return False
if(success_overall):
print(f'I: Successfully loaded sequence from {filename}')
else:
print('W: Filenames do not match for sequence!')
return False
d = self.get_data()
ntnmr.ZeroFill(len(d[0])) # to clear everything out.
return True
def load_dashboard(self, dashboard_fn):
'''Loads a dashboard into TNMR. Resets the parameters to those in the dashboard file, despite what the TNMR documentation says.
Parameters
----------
filename: str, designates the dashboard to load
Returns
-------
A success flag - beware; sometimes, this can fail and still provide a false positive. False negatives, as far as I'm aware, never happen, though, so feel free to rely on that.
'''
print(f'I: Loading dashboard setup from {dashboard_fn}')
success = self.get_instance().LoadParameterSetupFromFile(dashboard_fn)
if(success):
print(f'I: Successfully loaded dashboard')
else:
print(f'W: Failed to load dashboard')
return success