frappy_psi.tnmr: added a module (frappy_psi.tnmr.OTFModule) to interface with the Tecmag NMR (TNMR) program from a frappy server.

This commit is contained in:
2025-06-11 08:50:21 +02:00
parent 2c5d5da773
commit 05324a8966
7 changed files with 1008 additions and 0 deletions

6
cfg/frappy_OTFMod_cfg.py Normal file
View File

@@ -0,0 +1,6 @@
import frappy.core as fc
import TNMRExt.OTFModule as mod
Node('example_TNMR.psi.ch', 'The NMR system running the Scout and controlled with TNMR', interface='tcp://5000')
Mod('tnmr_otf_module', mod.ProgrammedSequence, 'NMR Sequence')

View File

@@ -0,0 +1,268 @@
# -*- coding: utf-8 -*-
"""
TNMR_DG_Extension
______________________
Version: 1.0
Authors: Davis Garrad (Paul Scherrer Institute, CH)
______________________
frappy-based module for generating and running pulse sequences on TNMR (Tecmag).
"""
#On-the-fly!
import TNMRExt.TNMR_DG_Extension as te
import TNMRExt.SequenceGeneration as seq_gen
import frappy.core as fc
import frappy
import win32com
import pythoncom
import numpy as np
import threading
import time
import os
class ProgrammedSequence(fc.Readable):
"""An NMR device being driven by an instance of TNMR. Requires that an instance of TNMR is opened before creation.
Instance Attributes
-------------------
(parameter) title: a title which will be embedded to the sequence files. Use this for identification.
(parameter) sequence_data: a dictionary describing the currently-built sequence
(parameter) value: an array of complexes representing the TNMR data return (technically inherited from Readable)
(parameter) acquisition_time: float (usecs) which describes the length of acquisition
(parameter) ringdown_time: float (usecs) which describes the length of ringdown
(parameter) pre_acquisition_time: float (usecs) which describes the length of time to wait after ringdown finishes (1u is okay)
(parameter) post_acquisition_time: float (ms) which describes the length of time to wait after finishing acquisition
(parameter) acq_phase_cycle: str, the phase cycle to run on acquisition (eg., '0 1 1 2', '0 1 2 3', '1 1 2 2 0 0 3 3 1 2 3 4', ...)
Methods
-------
(cmd) add_pulse: adds a pulse to the end of the sequence.
Arguments:
- pulse_width: pulse width to add to sequence (usec)
- pulse_height: pulse height to add to sequence (a.u.)
- relaxation_time: relaxation time to add to sequence (usec)
- phase_cycle: the phase cycle to run for this pulse (eg., '0 1 1 2', '0 1 2 3', '1 1 2 2 0 0 3 3 1 2 3 4', ...)
(cmd) pop_pulse: removes the last pulse from the sequence
(cmd) compile_sequence: finishes building the sequence, saves it and its configuration to files (timestamped), and sets the system ready to run
(cmd) run: if compiled, starts data acquisition on TNMR (async)
(cmd) compile and run: you can guess
Inherited Attributes
--------------------
name: from Module
logger: from Module
cfgdict: from Module
src: from Module
status: from Readable
value: from Readable
pollinterval: from Readable
"""
# inherited
value = fc.Parameter('data_return', fc.ArrayOf(fc.FloatRange(), maxlen=4096), default=[])
status = fc.Parameter(datatype=frappy.datatypes.StatusType(fc.Readable, "DISABLED", 'PREPARED', 'BUSY'))
pollinterval = fc.Parameter(default=1)
# basic
title = fc.Parameter('title', fc.StringType(), default='Sequence', readonly=False)
sequence_data = fc.Parameter('sequence_config', fc.ArrayOf(fc.StructOf(pulse_width=fc.FloatRange(unit='u'),
pulse_height=fc.FloatRange(),
relaxation_time=fc.FloatRange(unit='u'),
phase_cycle=fc.StringType())))
# sequence edit
pulse_width = fc.Parameter('pulse_width', fc.FloatRange(unit='u'), readonly=False, group='pulse_editor')
pulse_height = fc.Parameter('pulse_height', fc.FloatRange(), readonly=False, group='pulse_editor')
relaxation_time = fc.Parameter('relaxation_time', fc.FloatRange(unit='u', min=0.1), readonly=False, group='pulse_editor')
phase_cycle = fc.Parameter('phase_cycle', fc.StringType(), readonly=False, group='pulse_editor', default='')
# final details
acquisition_time = fc.Parameter('acquisition_time', fc.FloatRange(unit='u'), readonly=True, group='sequence_editor', default=204.8) # this is a limit set by the dwell limit and number of acquisition points (1024, TODO: Make this adjustable)
ringdown_time = fc.Parameter('ringdown_time', fc.FloatRange(unit='u'), readonly=False, group='sequence_editor')
pre_acquisition_time = fc.Parameter('pre_acquisition_time', fc.FloatRange(unit='u'), readonly=False, group='sequence_editor')
post_acquisition_time = fc.Parameter('post_acquisition_time', fc.FloatRange(unit='m'), readonly=False, group='sequence_editor')
acq_phase_cycle = fc.Parameter('acq_phase_cycle', fc.StringType(), readonly=False, group='sequence_editor', default='')
inited = False
### SETUP
def tnmr(self):
if not(self.inited):
self.ntnmr = te.TNMR()
self.inited = True
return self.ntnmr
def initialReads(self):
pass
### COMMANDS
@fc.Command(description="Add Pulse", group='pulse_editor', argument={ 'type': 'struct' }, members={ 'a': { 'type': 'string' }})
def add_pulse(self):
if(self.status == ('PREPARED', 'compiled')):
self.status = ('IDLE', 'ok - uncompiled')
data = list(self.sequence_data) # should be a tuple when it comes out of ArrayOf __call__, so make it mutable
data += [ { 'pulse_width': self.pulse_width, 'pulse_height': self.pulse_height, 'relaxation_time': self.relaxation_time, 'phase_cycle': self.phase_cycle } ]
self.sequence_data = data
@fc.Command(description="Pop Pulse", group='pulse_editor')
def pop_pulse(self):
if(self.status == ('PREPARED', 'compiled')):
self.status = ('IDLE', 'ok - uncompiled')
data = list(self.sequence_data) # should be a tuple when it comes out of ArrayOf __call__, so make it mutable
data = data[:-1] # chop off the tail
self.sequence_data = data
@fc.Command(description="Compile", group='sequence_editor')
def compile_sequence(self):
threading.Thread(target=lambda s=self: s.__compile_sequence()).start()
@fc.Command(description="Run")
def run(self):
threading.Thread(target=lambda s=self: s.__zero_go()).start()
@fc.Command(description="Compile & Run")
def compile_and_run(self):
threading.Thread(target=lambda s=self: s.__compile_and_run()).start()
### READ/WRITE
def read_value(self):
return self.value # TODO: this is only reals
def read_title(self):
return self.title
def write_title(self, t):
self.title = t
self.status = ('IDLE', 'ok - uncompiled')
return self.read_title()
def read_sequence_data(self):
return self.sequence_data
def read_pulse_width(self):
return self.pulse_width
def write_pulse_width(self, t):
self.pulse_width = t
return self.read_pulse_width()
def read_pulse_height(self):
return self.pulse_height
def write_pulse_height(self, t):
self.pulse_height = t
return self.read_pulse_height()
def read_relaxation_time(self):
return self.relaxation_time
def write_relaxation_time(self, t):
self.relaxation_time = t
return self.read_relaxation_time()
def read_phase_cycle(self):
return self.phase_cycle
def write_phase_cycle(self, t):
self.phase_cycle = t
return self.read_phase_cycle()
def read_acquisition_time(self):
return self.acquisition_time
def write_acquisition_time(self, t):
self.acquisition_time = t
self.status = ('IDLE', 'ok - uncompiled')
return self.read_acquisition_time()
def read_ringdown_time(self):
return self.ringdown_time
def write_ringdown_time(self, t):
self.ringdown_time = t
self.status = ('IDLE', 'ok - uncompiled')
return self.read_ringdown_time()
def read_pre_acquisition_time(self):
return self.pre_acquisition_time
def write_pre_acquisition_time(self, t):
self.pre_acquisition_time = t
self.status = ('IDLE', 'ok - uncompiled')
return self.read_pre_acquisition_time()
def read_post_acquisition_time(self):
return self.post_acquisition_time
def write_post_acquisition_time(self, t):
self.post_acquisition_time = t
self.status = ('IDLE', 'ok - uncompiled')
return self.read_post_acquisition_time()
def read_acq_phase_cycle(self):
return self.acq_phase_cycle
def write_acq_phase_cycle(self, t):
self.acq_phase_cycle = t
self.status = ('IDLE', 'ok - uncompiled')
return self.read_acq_phase_cycle()
### PRIVATE (Utility)
def __compile_sequence(self):
if(self.status != ('PREPARED', 'compiled')) and (self.status[0] != 'BUSY'):
self.status = ('BUSY', 'compiling')
# first, create the sequence
seq = seq_gen.get_initial_block()
i = 0
for s in self.sequence_data:
seq = seq_gen.combine_blocks(seq, seq_gen.get_single_pulse_block(f'pulse_{i}', str(s['pulse_width']) + 'u',
str(s['pulse_height']),
str(s['relaxation_time']) + 'u',
str(s['phase_cycle'])))
i += 1
seq = seq_gen.combine_blocks(seq, seq_gen.get_final_block(str(self.ringdown_time) + 'u',
str(self.pre_acquisition_time) + 'u',
str(self.acquisition_time) + 'u',
str(self.post_acquisition_time) + 'm',
str(self.acq_phase_cycle)))
# then, save the thing
filepath = os.getcwd()
filename = self.title + f'_{time.time()}'
filename = filepath + '/' + filename.replace('.','')
seq_gen.save_sequence(filename, seq)
seq_gen.save_sequence_cfg(filename, seq)
# then, load the thing into TNMR
self.tnmr().load_sequence(filename)
# finally, let ourselves know we're ready
self.status = ('PREPARED', 'compiled')
def __zero_go(self):
if(self.status[0] != 'BUSY'):
self.status = ('BUSY', 'acquiring')
self.tnmr().ZeroGo(lock=True, interval=0.5)
self.value = self.tnmr().get_data()[0] # TODO: this is only reals...
print(self.value)
self.status = ('PREPARED', 'compiled')
def __compile_and_run(self):
self.__compile_sequence()
self.__zero_go()

View File

@@ -0,0 +1,105 @@
# -*- coding: utf-8 -*-
"""
SequenceGeneration
______________________
Version: 1.0
Authors: Davis Garrad (Paul Scherrer Institute, CH)
______________________
Wrapper for the API I wrote to generate pulse sequences programmatically in TNMR (Tecmag).
"""
import TNMRSeq.sequence_generator as se
from pydantic.utils import deep_update
import json
def get_single_pulse_block(name, pulse_width, pulse_height, relaxation_time, phase_cycle='0'):
'''Generates a single block of data to create a sequence with.
Parameters
----------
name: str, just the prefix for the column names. Ensure this is unique over the whole sequence!
pulse_width: str, in the format '10u' (10 microseconds)
pulse_height: str, in the format '40' (I'm not honestly sure what units this is in...)
relaxation_time: str, in the format '10u'
phase_cycle: a given phase cycle (steps of 4, 0-3 inc.) (eg., '0 0 1 1 2 3 0 1', etc.)
Returns
-------
a dictionary which can be updated with others to generate a larger, more complex sequence.
'''
ph = name + '_phase'
rl = name + '_relaxation'
block = se.generate_default_sequence([ ph, rl ], [ pulse_width, relaxation_time ])
# COLUMNNS
# PH column
block['columns'][ph]['F1_Ampl']['value'] = str(pulse_height)
block['columns'][ph]['Delay'] = str(pulse_width)
block['columns'][ph]['F1_UnBlank']['value'] = '1'
block['columns'][ph]['Rx_Blank']['value'] = '1'
# relaxation column
block['columns'][rl]['F1_UnBlank']['value'] = '1'
block['columns'][rl]['Rx_Blank']['value'] = '1'
if(phase_cycle != ''):
table_name = f'ph_{name}'
block['columns'][ph]['F1_Ph']['table'] = table_name
block['tables'][table_name] = { 'values': phase_cycle, 'typestr': 'HP', 'start': 1 }
return block
def get_initial_block():
block = se.generate_default_sequence(['Phase reset', 'Unblank'], ['1u', '10u'])
# Phase reset
block['columns']['Phase reset']['F1_PhRst']['value'] = '1'
# Unblank
block['columns']['Unblank']['F1_UnBlank']['value'] = '1'
block['columns']['Unblank']['Rx_Blank']['value'] = '1'
return block
def get_final_block(ringdown_time, preacquire_time, acquire_time, cooldown_time, acq_phase_cycle='0'):
'''Generates the final block of data to create a sequence with.
Parameters
----------
ringdown_time: str, of format '10u', how long to ringdown
preacquire_time: str, in the format '10u', how long to wait after ringdown before acquiring data. A good default is 1 usec
acquire_time: str, in the format '10u', how long to acquire data for
cooldown_time: str, in the format '40u', how long to wait after acquisition.
acq_phase_cycle: str, the phase cycle that the acquisition should follow
Returns
-------
a dictionary which can be updated with others to generate a larger, more complex sequence.
'''
block = se.generate_default_sequence(['Ringdown', 'RX On', 'Acquisition', 'Finish', ''], [ringdown_time, preacquire_time, acquire_time, cooldown_time, '1u'])
block = se.generate_default_sequence(['Ringdown', 'RX On', 'Acquisition', 'Finish', ''], [ringdown_time, preacquire_time, acquire_time, cooldown_time, '1u'])
# ringdown
block['columns']['Ringdown']['Rx_Blank']['value'] = '1'
# Acquire
block['columns']['Acquisition']['Acq']['value'] = '1'
#block['columns']['Acquisition']['Acq_phase']['value'] = '0'
if(acq_phase_cycle != ''):
block['columns']['Acquisition']['Acq_phase']['table'] = 'phacq'
block['tables']['phacq'] = { 'values': acq_phase_cycle, 'typestr': 'HP', 'start': 1 }
return block
def combine_blocks(l, r):
return deep_update(l, r)
def save_sequence(filename, sequence):
se.create_sequence_file(filename, sequence)
def save_sequence_cfg(filename, sequence):
with open(filename + '.cfg', 'w') as file:
json.dump(sequence, file, indent=4)
file.close()

View File

@@ -0,0 +1,320 @@
# -*- coding: utf-8 -*-
"""
TNMR_DG_Extension
______________________
Version: 1.0
Authors: Davis Garrad (Paul Scherrer Institute, CH)
______________________
Wrapper for communication with TecMag TNMR software, specifically in the context of the NMR setup in Gediminas Simutis' Lab at PSI, CH.
"""
import os
TEMPLATE_FILE_PATH = os.path.dirname(os.path.realpath(__file__)) + '/templates/' # TODO: Make some sort of installer/initialiser that sets this all up...
import win32com.client
import pythoncom
import time
import json
class TNMRNotRunnningError(Exception):
def __init__(self, msg=None):
if msg is None:
msg = "No instance of TNMR running. Start TNMR and try again."
super(TNMRNotRunnningError, self).__init__(msg)
class TNMR:
""" This class allows to communicate with Tecmag TNMR software for easier acquisition
control.
Instance Attributes
-------------------
NMTNR: win32com object
ACTIVEFILE: str
path to active TNMR file
Methods
-------
openfile(filepath: str, active: bool):
opens the tnt file specified by filepath
if active is true, the newly opened file will be set to ACTIVEFILE
set_activefile():
set ACTIVEFILE to current TNMR active doc path
ZeroGo(lock: bool, interval: float):
if possible, starts an experiment
if lock is true, the program will be blocked from further interaction until
the acqusition endswith
acquisition_running():
returns the acquisition status of TNMR
"""
def __init__(self, filepath = "", NTNMR_inst=None):
""" Creates an instance of the NTNMR class, which is used to communicate with TNMR,
Tecmags control software. Basically a wrapper for TNMRs api
Parameters
----------
filepath: specifies a path to the file tnt you want to use
"""
#first we check if an instance of TNMR is running an get it or create it
print('Opening TNMR connection')
try:
if(NTNMR_inst is None):
self.NTNMR = win32com.client.GetActiveObject("NTNMR.Application")
else:
self.NTNMR = NTNMR_inst
except pythoncom.com_error:
raise TNMRNotRunnningError
# next we open a specified file. If none is specified, then we use the active file
if filepath != "":
print(f'Loading file {filepath}')
self.NTNMR.OpenFile(filepath)
self.ACTIVEFILE = self.NTNMR.GetActiveDocPath
self.ACTIVEPATH = os.path.dirname(self.ACTIVEFILE)
print(f'Active file: {self.ACTIVEFILE} in path {self.ACTIVEPATH}')
def execute_cmd(self, cmd):
print('W: Executing arbitrary command: ' + f'out = self.NTNMR.{cmd}')
out = 0
exec(f'out = self.NTNMR.{cmd}\nprint("W: OUTPUT: " + str(out))')
return out
def openfile(self, filepath, active = True):
""" Opens a new file. Per default, the new file will be selected as active.
Set active = False to prevent this.
Parameters
----------
filepath: str
path to tnt file. Make sure to pass a raw string (i.e. backslashes are
escaped)
active: bool
"""
print(f'Opening file {filepath}')
self.NTNMR.OpenFile(filepath)
if active:
self.ACTIVEFILE = self.NTNMR.GetActiveDocPath
print(f'Active file: {self.ACTIVEFILE} in path {self.ACTIVEPATH}')
def set_activefile(self):
""" Sets TNMR active doc path to ACTIVEFILE
"""
self.ACTIVEFILE = self.NTNMR.GetActiveDocPath
self.ACTIVEPATH = os.path.dirname(self.ACTIVEFILE)
print(f'Active file: {self.ACTIVEFILE} in path {self.ACTIVEPATH}')
def ZeroGo(self, lock = True, interval = 0.5):
""" If possible, zeros and starts acquisition
Parameters
----------
lock: bool
if true, program waits until acquisition is done
interval: float
how often to check if acquisition done
"""
# for some reason CheckAcquisition is False while an experiment is
# running but true otherwise
print('Zero-going...')
if self.NTNMR.CheckAcquisition == True:
self.NTNMR.ZG
else:
print('An Acquisition is already running')
if lock:
print("Application locked during acquisition\n...waiting...")
while self.NTNMR.CheckAcquisition == False:
time.sleep(interval)
print("Acquisition done")
def acquisition_running(self):
""" Checks if acquisition is running
Returns
-------
True: if running
False: if not running
"""
return not(self.NTNMR.CheckAcquisition)
def get_data(self):
raw_data = self.NTNMR.GetData
reals = raw_data[::2]
imags = raw_data[1::2]
return (reals, imags)
def save_file(self, filepath=''):
""" Save file to filepath. if no filepath specified, save current active file
Parameters
----------
filepath: str
"""
print('I: Saving')
if filepath == '':
self.NTNMR.Save
else:
self.NTNMR.SaveAs(filepath)
print(f'I: Saved to file {filepath}')
def set_nmrparameter(self, param_name: str, value: str):
"""Sets the value of an NMR parameter by name.
Parameters
----------
param_name: str
value: str
Returns
-------
True: if successful
False: otherwise.
"""
if(self.is_nmrparameter(param_name)):
self.NTNMR.SetNMRParameter(param_name, value)
print(f'I: Setting parameter {param_name} to value of {value}')
return True
print(f'W: Failed to set parameter {param_name} to {value}')
return False
def get_nmrparameter(self, param_name: str):
"""Returns the value of an NMR parameter by name.
Parameters
----------
param_name: str
Returns
-------
The value of the parameter: if found
None: Else
"""
try:
return self.NTNMR.GetNMRParameter(param_name)
except:
print('not a param. try one of:', self.get_page_parameters('Sequence'))
return None
def is_nmrparameter(self, param_name: str):
"""Checks that a given parameter actually exists in the setup.
Parameters
----------
param_name: str
Returns
-------
True: if the parameter exists
False: otherwise.
"""
try:
self.NTNMR.GetNMRParameter(param_name)
return True
except:
return False
def get_all_nmrparameters(self):
"""Gets all parameter names and values from all pages of the NMR parameters.
Returns
-------
A dictionary of all parameters, in form { [page name]: { [parameter name]: [parameter value], ... }, ... }
"""
full_dict = {}
pages = self.NTNMR.GetParameterPageList.split(",")
for p in pages:
p = p.strip()
sub_dict = self.get_page_parameters(p)
full_dict[p] = sub_dict
return full_dict
def get_page_parameters(self, page):
"""Gets the parameters used for the sequence.
Returns
-------
a dictionary of the sequence parameters.
"""
sub_dict = { }
params_raw = self.NTNMR.GetParameterListInPage(page)
params = params_raw[params_raw.find('=')+1:].split(",")
for param in params:
param_stripped = param.strip()
val = self.get_nmrparameter(param_stripped)
if not(val is None):
sub_dict[param_stripped] = str(val)
else:
print(param_stripped)
return sub_dict
def load_sequence(self, filename):
"""WARNING: POSSIBLY DESTRUCTIVE TO DATA
Reads a sequence file and (hopefully) updates the dashboard on the Sequence page. Because of various problems with the TNMR API, this function does the following:
1. Closes the currently active file. ENSURE YOUR DATA IS SAVED BEFORE USING THIS FUNCTION. It will NOT verify that everything is up to date, nor block. Be wise.
2. Opens a template file, located at a predefined location, defined in this file.
3. Loads the given sequence into this template file (tmp.tnt)
4. Saves this as a new template file (tmper.tnt)
5. Closes and reloads the new template file, to make the sequence parameters visible.
There is incredible ''bodging'' (as Tom calls it) in this code, and it is entirely Tecmag's fault, as their API simply doesn't do what it says it does.
Parameters
----------
filename: str
Returns
-------
True: if successful
False: if otherwise. (TODO: Exceptions-based rather than this)
"""
print(f'Loading sequence at {filename}')
self.NTNMR.CloseActiveFile
success = self.NTNMR.OpenFile(TEMPLATE_FILE_PATH + 'tmp.tnt')
if(success):
print('Template file reloaded')
else:
print(f'Failed to load template file. Please ensure that there exists an empty .tnt file named {TEMPLATE_FILE_PATH}/tmp.tnt (Close, New, Save As...)')
return False
self.set_activefile()
success = self.NTNMR.LoadSequence(filename if filename[-4:]=='.tps' else (filename+'.tps'))
if(success):
print(f'Successfully loaded sequence')
else:
print('Failed to load sequence')
return False
self.NTNMR.SaveAs(TEMPLATE_FILE_PATH + 'tmper.tnt') # even more temporary
success = self.NTNMR.OpenFile(TEMPLATE_FILE_PATH + 'tmper.tnt') # reload the file so that we can actually read/write to the Sequence parameters (TNMR bug)
self.set_activefile()
if(success):
print(f'Successfully reloaded')
else:
print('Failed to reload')
return False
print(f'I: Successfully loaded sequence from {filename}')
return True
def load_dashboard(self, dashboard_fn):
print(f'I: Loading dashboard setup from {dashboard_fn}')
success = self.NTNMR.LoadParameterSetupFromFile(dashboard_fn)
if(success):
print(f'I: Successfully loaded dashboard')
else:
print(f'W: Failed to load dashboard')
return success

View File

@@ -0,0 +1,309 @@
# HEADER SYNTAX
# TODO: See if I can't get my name into this...
# PSEQ1.001.18 BIN
# # 0*3 [filename]
# 0x4 0*3 0*4
# # 0*3 [user]
# 0x8 0*3 (2 numbers)Ah0000 (timestamp) (4 bytes, in LSB first - little endian, then 4 zeros)
# 0x12 0*3 (device control 2)
# (small number, 0x2, 0x4) 0*3 (same as previous) 0*15 0x1
# 0*11
# 0x1 0*3 [space]
# # 0*3 Name:
# # 0*3 [name]
# 0*56
# # 0*3 [col 1 name]
# 0*56
# # 0*3 [col 2 name]
# ...
# 0*56
#
# DELAY SYNTAX
# (same number as above) 0*3 0x1 0*7 0x1 0*3 0x1 (don't ask)
# 0*11
# # 0*3 [default]
# # * 0*3 Delay
# # * 0*3 Delay
# 0*56
# # 0*3 [val 1]
# 0*56
# # 0*3 [val 2]
# ...
# 0*56
# EVENT SYNTAX
#
# (this part specifies the section)
# 0x2 0*3 [some number] 1 [X] [Y]
# 0x1 0*3 [Z] [W] 0*2 0x1
#
# (this part specifies the subsection)
# 0x11
# # 0*3 [default value]
# # 0*3 [event type (submenu)]
# # 0*3 [event type (submenu)]
# 0*8
# [ (# 0*3 [table 1D name] 0x1) if table exists, else (nothing) ]
# 0*8
# [ (# 0*3 [table 2D name] 0x1) if table exists, else (nothing) ]
# 0*8
# ...
# 0*8
# 0*11
# # 0*3 [value]
# 0*56
# 0*3 [value 2]
# 0*56
# ...
#
# (this part specifies another subsection) (only for CTRL section)
# 0x2
# 0*3 0xd (seemingly random identifiers)
# 0*3 0x18
# 0*3 0xd
# 0*3 0x1
# 0x11
# # 0*3 [value]
# # 0*3 [event type (submenu)]
# # 0*3 [event type (submenu)]
# 0*56
# # 0*3 [value]
# 0*56
# TABLE SPEC SYNTAX
# 0*56
# 0*56
# 0*11
# 0*8
# [num_tables]
# 0*3
# # 0*3 [table_1_name]
# # 0*3 [table values] 0*16 [type] (normally just HP)
# 0*2 [starting offset (in hex) - 1 is default] 0*3 0x4 8)7 0x1 0*31 0x5
# 0*4
# ...
# 0*10 0x0
Z = '\x00' # for my sanity
event_codes = {
'F1_Ampl': ('\x1f', 'P', 'E', '3', 'R', '\x08'), # F1
'F1_Ph': ('\x05', 'M', 'E', 'H', 'P', '\x02'),
'F1_PhMod': ('\x17', 'P', 'E', '3', 'P', '\x08'),
'F1_HOP': ('\x0d', 'M', 'E', 'X', 'T', '\x01'),
'F1_UnBlank': ('\x17', 'M', 'E', 'X', 'T', '\x01'),
'F1_PhRst': ('\x00', 'M', 'E', 'X', 'T', '\x01'), # RST
'F1_ExtTrig': ('\x09', 'M', 'E', 'X', 'T', '\x01'),
'Scope_Trig': ('\x14', 'M', 'E', 'X', 'T', '\x01'), # CTRL (first)
'Loop': ('\x0d', '\x18', '\x0d', '\x01'),
'CBranch': ('\x0e', '\x00', '\x0e', '\x01'),
'CTest': ('\x0f', '\x00', '\x0f', '\x01'),
'Ext_Trig': ('\x10', '\x00', '\x10', '\x01'),
'RT_Update': ('\x11', '\x00', '\x11', '\x01'),
'Acq': ('\x17', 'Q', 'A', 'C', 'A', '\x18'), # ACQ
'Acq_phase': ('\x03', 'Q', 'A', 'H', 'P', '\x02'),
'Rx_Blank': ('\x0f', 'M', 'E', 'X', 'T', '\x01'), # RX
}
event_types = list(event_codes.keys())
event_defaults = { 'F1_Ampl': 0, # F1
'F1_PhMod': -1,
'F1_Ph': -1,
'F1_UnBlank': 0,
'F1_HOP': 0,
'F1_PhRst': 0, # RST
'F1_ExtTrig': 0,
'Ext_Trig': 0, # CTRL
'Loop': 0,
'Scope_Trig': 0, # special CTRL (first)
'CBranch': 0,
'CTest': 0,
'RT_Update': 0,
'Acq': 0, # ACQ
'Acq_phase': -1,
'Rx_Blank': 0, # RX
}
def fm(s, spacing=3):
'''formats a string with its length, then 3 zeroes, then the str'''
a = f'{chr(len(s))}{Z*spacing}{s}'
return a
def get_info_header(filename, author, col_names, tuning_number, binary_name='PSEQ1.001.18 BIN'):
headerstr = ''
headerstr += 'PSEQ1.001.18 BIN'
headerstr += fm(filename)
headerstr += f'\x04{Z*3}{Z*4}'
headerstr += fm(author)
headerstr += f'\x08{Z*3}\xda\xf1\x50\x00{Z*4}' # TODO: Timestamp, but right now it doesn't really matter.
headerstr += f'\x12{Z*3}'
headerstr += f'{tuning_number}{Z*3}{tuning_number}{Z*15}\x01{Z*11}'
headerstr += fm(' ')
headerstr += fm('Name:')
headerstr += fm('Name:')
headerstr += Z*56
for i in col_names:
headerstr += fm(i)
headerstr += Z*56
return headerstr
def get_delay_header(col_delays, tuning_number):
headerstr = ''
headerstr += f'{tuning_number}{Z*3}\x01{Z*7}\x01{Z*3}\x01'
headerstr += Z*11
headerstr += fm('1u')
headerstr += fm('Delay')
headerstr += fm('Delay')
headerstr += Z*56
for i in col_delays:
headerstr += fm(str(i))
headerstr += Z*56
return headerstr
def get_event_header(event_type, vals, tables, table_reg, tuning_number, col_delays):
'''Generates the file information for the events section.
Params
------
event_type: str describing the event
vals: an array of strs to be written.
tables: an array of dictionaries of table names to be written in format [ { '1D': None/str } (for col0), { '1D': None/str } (for col1) ... ]
'''
codes = event_codes[event_type]
headerstr = ''
if(len(codes) == 6): # regular header
headerstr += f'{tuning_number}{Z*3}{codes[0]}1{codes[1]}{codes[2]}'
headerstr += f'{codes[5]}{Z*3}{codes[3]}{codes[4]}{Z*2}\x01'
elif(len(codes) == 4): # exteension
headerstr += f'{tuning_number}{Z*3}{codes[0]}{Z*3}{codes[1]}{Z*3}{codes[2]}{Z*3}{codes[3]}'
else:
print('PANIC')
raise Exception
headerstr += Z*11
headerstr += fm(str(event_defaults[event_type]))
headerstr += fm(event_type)
headerstr += fm(event_type)
headerstr += Z*56
for i in range(len(vals)):
headerstr += fm(str(vals[i]))
if(event_type == 'Acq' and str(vals[i]) == '1'):
acq_points = 1024
sweep = '2500000.0Hz'
filtr = '2500000.0Hz'
dwell = '200.0n' # hard limit apparently...
#acq_length = col_delays[i]
#if('u' in acq_length):
# acq_length = float(acq_length.strip()[:-1])
#elif('m' in acq_length):
# acq_length = 1000*float(acq_length.strip()[:-1])
#elif('n' in acq_length):
# acq_length = 0.001*float(acq_length.strip()[:-1])
#dwell = acq_length / acq_points
#dwell = str(dwell) + 'u'
headerstr += Z*52
headerstr += f'\x01{Z*3}' + fm(str(acq_points))
headerstr += fm(str(sweep))
headerstr += fm(str(filtr))
headerstr += fm(str(dwell))
headerstr += fm(str(col_delays[i]))
headerstr += f'\x00{Z*5}' # I believe this is to link to dashboard... (set to zero or else it will just go to default)
else:
if not(tables[i] in list(table_reg.keys())):
headerstr += Z*56
else:
headerstr += Z*8 + fm(tables[i]) + '\x01' # 1D
headerstr += Z*43
return headerstr
def get_table_spec(tables):
'''Generates the file information for a set of tables.
Parameters
----------
tables: a dictionary of form { [table_name]: { 'values': '1 2 3', 'typestr': 'HP', 'start': 1 } }
'''
specstr = ''
specstr += Z*56
specstr += Z*56
specstr += Z*11
specstr += Z*5
if(len(list(tables.keys())) > 0):
specstr += Z*3
specstr += chr(len(list(tables.keys())))
for t in list(tables.keys()):
specstr += Z*3
specstr += fm(t)
specstr += fm(tables[t]['values']) + Z*16 + tables[t]['typestr']
specstr += Z*2 + chr(tables[t]['start']) + Z*3 + chr(0x04) + Z*7 + chr(0x01) + Z*31 + chr(0x05) + Z*4
specstr += Z*10+Z
return specstr
def generate_default_sequence(col_names, col_delays):
'''Generates a dictionary for use in create_sequence_file, and populates it with all the default values as specified by event_defaults and delays.
Parameters
----------
col_names: an iterable of each of the column titles
col_delays: an iterable of each of the column delay values.
Returns
-------
a data dictionary in the form required by create_sequence_file.
'''
full_dict = { 'columns': {}, 'tables': {} }
print(full_dict)
for c, delay in zip(col_names, col_delays):
sub_dict = {}
for e in event_types:
sub_dict[e] = { 'value': str(event_defaults[e]), 'table': '' }
sub_dict['Delay'] = delay
full_dict['columns'][c] = sub_dict.copy()
return full_dict
def create_sequence_file(filename, data, author='NA'):
'''Generates a Tecmag sequence file for use in Tecmag NMR (TNMR).
Parameters
----------
filename: str
data: a dictionary in the form { 'columns': { [column_name_0]: { 'F1_Ampl': [value], ..., 'Rx_Blank': [value], 'Delay': [value] }, ... }, 'tables': { 'table_1': {...}, ... } }. If any sub-entries are empty, they will be given default values (requires that all event_types are present). See event_types and event_defaults.
author [optional]: str to describe the file creator.
'''
content = ''
column_names = list(data['columns'].keys())
tuning_number = (len(column_names)+1).to_bytes().decode('utf-8')
column_delays = []
for i in column_names:
column_delays += [ str(data['columns'][i]['Delay']) ]
content += get_info_header(filename, author, column_names, tuning_number)
content += get_delay_header(column_delays, tuning_number)
for evnt in event_types:
evnt_data_values = []
evnt_data_tables = []
for i in column_names:
evnt_data_values += [ str(data['columns'][i][evnt]['value']) ]
evnt_data_tables += [ data['columns'][i][evnt]['table'] ]
content += get_event_header(evnt, evnt_data_values, evnt_data_tables, data['tables'], tuning_number, column_delays)
content += ' '
content += get_table_spec(data['tables'])
with open(f'{filename}' if '.tps' in filename[-4:] else f'{filename}.tps', 'bw') as file:
bs = []
for char in content:
bs += [ord(char)]
file.write(bytes(bs))
file.close()

BIN
frappy_psi/tnmr/tmp.tnt Normal file

Binary file not shown.

BIN
frappy_psi/tnmr/tmper.tnt Normal file

Binary file not shown.