thermofisher updates

Change-Id: I6e7103e87cb3c1e69ef4d0e16d06e3d0fc3729ea
This commit is contained in:
Oksana Shliakhtun
2023-05-17 16:02:41 +02:00
parent 1171245704
commit 04b9bed7c9
2 changed files with 147 additions and 86 deletions

View File

@ -16,96 +16,95 @@
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA # 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
# #
# Module authors: # Module authors:
# Daniel Margineda <daniel.margineda@psi.ch> # Daniel Margineda <daniel.margineda@psi.ch>, Oksana Shliakhtun <oksana.shliakhtun@psi.ch>
# ***************************************************************************** # *****************************************************************************
"""Signal Recovery SR7270: lockin amplifier for AC susceptibility""" """Signal Recovery SR7270: lockin amplifier for AC susceptibility"""
from frappy.core import Readable, Parameter, Command, FloatRange, TupleOf, \ from frappy.core import Readable, Parameter, FloatRange, TupleOf, \
HasIO, StringIO, Attached, IntRange, BoolType, EnumType HasIO, StringIO, Attached, IntRange, BoolType, Writable, EnumType
class SR7270(StringIO): class SR7270_IO(StringIO):
end_of_line = b'\x00' end_of_line = b'\x00'
identification = [('ID', r'.*')] # Identification
def communicate(self, command): # remove dash from terminator
reply = super().communicate(command) class Ametek(StringIO):
def get_par(self, cmd): # remove dash from terminator
reply = StringIO.communicate(self, cmd)
status = self._conn.readbytes(2, 0.1) # get the 2 status bytes status = self._conn.readbytes(2, 0.1) # get the 2 status bytes
return reply + ';%d;%d' % tuple(status) return reply + ';%d;%d' % tuple(status)
def comm(self, cmd):
reply, status, overload = self.get_par(cmd).split(';')
if overload != '0':
self.status = self.Status.WARN, f'overload {overload}'
self.status = self.Status.IDLE, ''
return reply
class XY(HasIO, Readable): class XY(HasIO, Readable):
x = Attached() ioClass = SR7270
y = Attached()
freq_arg = Attached()
amp_arg = Attached()
tc_arg = Attached()
phase_arg = Attached()
dac_arg = Attached()
# parameters required an initial value but initwrite write the default value for polled parameters # parameters required an initial value but initwrite write the default value for polled parameters
value = Parameter('X, Y', datatype=TupleOf(FloatRange(unit='V'), FloatRange(unit='V'))) value = Parameter('X, Y', datatype=TupleOf(FloatRange(unit='V'), FloatRange(unit='V')))
# separate module (Writable)
freq = Parameter('exc_freq_int', # separate module (Writable)
FloatRange(0.001, 250e3, unit='Hz'), class Frequency(Ametek, Writable):
value = Parameter('exc_freq_int', FloatRange(0.001, 250e3, unit='Hz'),
readonly=False, default=1000) # initwrite=True, readonly=False, default=1000) # initwrite=True,
# separate module (Writable)
amp = Parameter('exc_volt_int', def read_freq(self):
return self.comm('OF.')
def write_freq(self, value):
self.comm(f'OF. {value}')
return value
# separate module (Writable)
class Amp(Ametek, Writable):
value = Parameter('exc_volt_int',
FloatRange(0.00, 5, unit='Vrms'), FloatRange(0.00, 5, unit='Vrms'),
readonly=False, default=0.1) # initwrite=True, readonly=False, default=0.1) # initwrite=True,
# unify the following # unify the following
range = Parameter('sensitivity value', FloatRange(0.00, 1, unit='V'), default=1) range = Parameter('sensitivity value', FloatRange(0.00, 1, unit='V'), default=1)
irange = Parameter('sensitivity index', IntRange(0, 27), readonly=False, default=25) irange = Parameter('sensitivity index', IntRange(0, 27), readonly=False, default=25)
autorange = Parameter('autorange_on', EnumType('autorange', off=0, soft=1, hard=2), autorange = Parameter('autorange_on', EnumType('autorange', off=0, soft=1, hard=2), readonly=False,
readonly=False, default=0) # , initwrite=True default=0) # , initwrite=True
# unify the following # unify the following
tc = Parameter('time constant value', FloatRange(10e-6, 100, unit='s'), default=0.1) tc = Parameter('time constant value', FloatRange(10e-6, 100), unit='s', default=0.1)
itc = Parameter('time constant index', IntRange(0, 30), readonly=False, default=14) itc = Parameter('time constant index', IntRange(0, 30), default=14, readonly=False)
nm = Parameter('noise mode', BoolType(), readonly=False, default=0) nm = Parameter('noise mode', BoolType(), readonly=False, default=0)
phase = Parameter('Reference phase control', FloatRange(-360, 360, unit='deg'), phase = Parameter('Reference phase control', FloatRange(-360, 360, unit='deg'),
readonly=False, default=0) readonly=False, default=0)
# convert to enum # convert to enum
vmode = Parameter('Voltage input configuration', IntRange(0, 3), readonly=False, default=3) vmode = Parameter('control mode', IntRange(0, 3), readonly=False, default=3)
# dac = Parameter('output DAC channel value', datatype=TupleOf(IntRange(1, 4), FloatRange(0.0, 5000, unit='mV')), # dac = Parameter('output DAC channel value', datatype=TupleOf(IntRange(1, 4), FloatRange(0.0, 5000, unit='mV')),
# readonly=False, initwrite=True, default=(3,0)) # readonly=False, initwrite=True, default=(3,0))
# dac = Parameter('output DAC channel value', FloatRange(-10000, 10000, unit='mV'), # dac = Parameter('output DAC channel value', FloatRange(-10000, 10000, unit='mV'),
# readonly=False, initwrite=True, default=0) # readonly=False, initwrite=True, default=0)
ioClass = SR7270 # OS - there should be parameter+command IMODE before
def comm(self, command):
reply, status, overload = self.communicate(command).split(';')
if overload != '0':
self.status = self.Status.WARN, 'overload %s' % overload
else:
self.status = self.Status.IDLE, ''
return reply
def read_value(self): def read_value(self):
reply = self.comm('XY.').split(',') reply = self.comm('XY.').split(',')
x = float(reply[0]) x = float(reply[0])
y = float(reply[1]) y = float(reply[1])
if self.autorange == 1: # soft if self.autorange == 1: # soft
if max(abs(x), abs(y)) >= 0.9*self.range and self.irange < 27: if max(abs(x), abs(y)) >= 0.9 * self.range and self.irange < 27:
self.write_irange(self.irange+1) self.write_irange(self.irange + 1)
elif max(abs(x), abs(y)) <= 0.3*self.range and self.irange > 1: elif max(abs(x), abs(y)) <= 0.3 * self.range and self.irange > 1:
self.write_irange(self.irange-1) self.write_irange(self.irange - 1)
self._x.value = x # to update X,Y classes which will be the collected data. self._x.value = x # to update X,Y classes which will be the collected data.
self._y.value = y self._y.value = y
return x, y return x, y
def read_freq(self):
reply = self.comm('OF.')
return reply
def write_freq(self, value):
self.comm('OF. %g' % value)
return value
def write_autorange(self, value): def write_autorange(self, value):
if value == 2: # hard if value == 2: # hard
self.comm('AS') # put hardware autorange on self.comm('AS') # put hardware autorange on
@ -129,16 +128,16 @@ class XY(HasIO, Readable):
return reply return reply
def write_amp(self, value): def write_amp(self, value):
self.comm('OA. %g' % value) self.comm(f'OA. {value}')
return value return value
# external output DAC # external output DAC
#def read_dac(self): # def read_dac(self):
# # reply = self.comm('DAC %g' % channel) # failed to add the DAC channel you want to control # # reply = self.comm('DAC %g' % channel) # failed to add the DAC channel you want to control
# reply = self.comm('DAC 3') # stack to channel 3 # reply = self.comm('DAC 3') # stack to channel 3
# return reply # return reply
#def write_dac(self, value): # def write_dac(self, value):
# # self.comm('DAC %g %g' % channel % value) # # self.comm('DAC %g %g' % channel % value)
# self.comm('DAC 3 %g' % value) # self.comm('DAC 3 %g' % value)
# return value # return value
@ -178,7 +177,6 @@ class XY(HasIO, Readable):
def read_itc(self): def read_itc(self):
reply = self.comm('TC') reply = self.comm('TC')
return reply return reply
# phase and autophase # phase and autophase
@ -187,11 +185,10 @@ class XY(HasIO, Readable):
return reply return reply
def write_phase(self, value): def write_phase(self, value):
self.comm('REFP %d' % round(1000*value, 0)) self.comm('REFP %d' % round(1000 * value, 0))
self.read_phase() self.read_phase()
return value return value
@Command()
def aphase(self): def aphase(self):
"""auto phase""" """auto phase"""
self.read_phase() self.read_phase()
@ -204,8 +201,8 @@ class XY(HasIO, Readable):
# return reply # return reply
def write_vmode(self, value): def write_vmode(self, value):
self.comm('VMODE %d' % value) self.comm(f'IMODE {0}') # voltage enabled
# self.read_vmode() self.comm(f'VMODE {value}')
return value return value

View File

@ -18,6 +18,60 @@
# Module authors: # Module authors:
# Oksana Shliakhtun <oksana.shliakhtun@psi.ch> # Oksana Shliakhtun <oksana.shliakhtun@psi.ch>
# ***************************************************************************** # *****************************************************************************
""" RUFS Command: Description of Bits
====== ======================================================== ==============================================
Value Description
====== ======================================================== ==============================================
V1
B6: warning, rtd1 (internal temp. sensor) is shorted
B0 --> 1
B7: warning, rtd1 is open
B1 --> 2
V2
B0: error, HTC (high temperature cutout) fault B2 --> 4
B1: error, high RA (refrigeration) temperature fault B3 --> 8
V3 B4 --> 16
B0: warning, low level in the bath
B5 --> 32
B1: warning, low temperature
B6 --> 64
B2: warning, high temperature
B7 --> 128
B3: error, low level in the bath
B4: error, low temperature fault
B5: error, high temperature fault
B6: error, low temperature fixed* fault
B7: error, high temperature fixed** fault
V4
B3: idle, circulator** is running
B5: error, circulator** fault
V5
B0: error, pump speed fault
B1: error, motor overloaded
B2: error, high pressure cutout
B3: idle, maximum cooling
B4: idle, cooling
B5: idle, maximum heating
B6: idle, heating
====== ======================================================== ==============================================
"""
from frappy.core import StringIO, Parameter, Readable, HasIO, \ from frappy.core import StringIO, Parameter, Readable, HasIO, \
Drivable, FloatRange, IDLE, ERROR, WARN, BoolType Drivable, FloatRange, IDLE, ERROR, WARN, BoolType
@ -34,57 +88,64 @@ class SensorA10(HasIO, Readable):
value = Parameter('internal temperature', unit='degC') value = Parameter('internal temperature', unit='degC')
def get_par(self, cmd): def get_par(self, cmd):
"""
All the reading commands starts with 'R', in the source code all the commands are written without 'R' (except
'RUFS').The result of a reading command is a value in the format '20C', without spaces.
:param cmd: any hardware command
:return: 'R'+cmd
"""
new_cmd = 'R' + cmd new_cmd = 'R' + cmd
reply = self.communicate(new_cmd) reply = self.communicate(new_cmd)
if any(unit.isalpha() for unit in reply): if any(unit.isalpha() for unit in reply):
reply = ''.join(unit for unit in reply if not unit.isalpha()) reply = ''.join(unit for unit in reply if not unit.isalpha())
return float(reply) return float(reply)
# def set_par(self, cmd, arg):
# new_cmd = 'S' + cmd.format(arg=arg)
# return self.communicate(new_cmd)
# # return self.get_par(cmd)
def read_value(self): def read_value(self):
"""
Reading internal temperature sensor value.
"""
return self.get_par('T') return self.get_par('T')
def read_status(self): def read_status(self):
result_str = self.communicate('RUFS') result_str = self.communicate('RUFS') # read unit fault status
values_str = result_str.strip().split() values_str = result_str.strip().split()
values_int = [int(val) for val in values_str] values_int = [int(val) for val in values_str]
v1, v2, v3, v4, v5 = values_int[:5] v1, v2, v3, v4, v5 = values_int #[:5]
status_messages = [ status_messages = [
(ERROR, 'high tempr. cutout fault', v2, 0), (ERROR, 'high tempr. cutout fault', 2, 0),
(ERROR, 'high RA tempr. fault', v2, 1), (ERROR, 'high RA tempr. fault', 2, 1),
(ERROR, 'high temperature fixed fault', v3, 7), (ERROR, 'high temperature fixed fault', 3, 7),
(ERROR, 'low temperature fixed fault', v3, 6), (ERROR, 'low temperature fixed fault', 3, 6),
(ERROR, 'high temperature fault', v3, 5), (ERROR, 'high temperature fault', 3, 5),
(ERROR, 'low temperature fault', v3, 4), (ERROR, 'low temperature fault', 3, 4),
(ERROR, 'low level fault', v3, 3), (ERROR, 'low level fault', 3, 3),
(ERROR, 'circulator fault', v4, 5), (ERROR, 'circulator fault', 4, 5),
(ERROR, 'high press. cutout', v5, 2), (ERROR, 'high press. cutout', 5, 2),
(ERROR, 'motor overloaded', v5, 1), (ERROR, 'motor overloaded', 5, 1),
(ERROR, 'pump speed fault', v5, 0), (ERROR, 'pump speed fault', 5, 0),
(WARN, 'open internal sensor', v1, 7), (WARN, 'open internal sensor', 1, 7),
(WARN, 'shorted internal sensor', v1, 6), (WARN, 'shorted internal sensor', 1, 6),
(WARN, 'high temperature warn', v3, 2), (WARN, 'high temperature warn', 3, 2),
(WARN, 'low temperature warn', v3, 1), (WARN, 'low temperature warn', 3, 1),
(WARN, 'low level warn', v3, 0), (WARN, 'low level warn', 3, 0),
(IDLE, 'max. heating', v5, 5), (IDLE, 'max. heating', 5, 5),
(IDLE, 'heating', v5, 6), (IDLE, 'heating', 5, 6),
(IDLE, 'cooling', v5, 4), (IDLE, 'cooling', 5, 4),
(IDLE, 'max cooling', v5, 3), (IDLE, 'max cooling', 5, 3),
(IDLE, '', v4, 3), (IDLE, '', 4, 3),
] ]
for status_type, status_msg, vi,bit in status_messages: for status_type, status_msg, vi, bit in status_messages:
if vi & (1 << bit): if values_int[vi-1] & (1 << bit):
print(status_type, status_msg, vi, bit)
return status_type, status_msg return status_type, status_msg
return WARN, 'circulation off' return WARN, 'circulation off'
class TemperatureLoopA10(HasConvergence, SensorA10, Drivable): class TemperatureLoopA10(SensorA10, Drivable):
value = Parameter('temperature', unit='degC') value = Parameter('temperature', unit='degC')
target = Parameter('setpoint/target', datatype=FloatRange, unit='degC', default=0) target = Parameter('setpoint/target', datatype=FloatRange, unit='degC', default=0)
circ_on = Parameter('is circulation running', BoolType(), readonly=False, default=False) circ_on = Parameter('is circulation running', BoolType(), readonly=False, default=False)
@ -108,9 +169,12 @@ class TemperatureLoopA10(HasConvergence, SensorA10, Drivable):
return self.get_par('S') return self.get_par('S')
def write_target(self, target): def write_target(self, target):
"""
:param target: here, it serves as an equivalent to a setpoint.
"""
self.write_circ_on('1') self.write_circ_on('1')
self.communicate(f'SS {target}') self.communicate(f'SS {target}')
self.start_state() # self.start_state()
return target return target
## heat PID ## heat PID