newest version of okasanas drivers

This commit is contained in:
l_samenv 2023-05-17 16:56:49 +02:00
parent 1407514458
commit db94def694
3 changed files with 313 additions and 33 deletions

View File

@ -25,3 +25,10 @@ Calibrated sensors and control loop not yet supported.
:show-inheritance: :show-inheritance:
:members: :members:
Bath Thermostat Thermofisher
............................
.. automodule:: frappy_psi.thermofisher
:show-inheritance:
:members:

209
frappy_psi/SR.py Normal file
View File

@ -0,0 +1,209 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# *****************************************************************************
# This program is free software; you can redistribute it and/or modify it under
# the terms of the GNU General Public License as published by the Free Software
# Foundation; either version 2 of the License, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more
# details.
#
# You should have received a copy of the GNU General Public License along with
# this program; if not, write to the Free Software Foundation, Inc.,
# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
#
# Module authors:
# Daniel Margineda <daniel.margineda@psi.ch>, Oksana Shliakhtun <oksana.shliakhtun@psi.ch>
# *****************************************************************************
"""Signal Recovery SR7270: lockin amplifier for AC susceptibility"""
from frappy.core import Readable, Parameter, FloatRange, TupleOf, \
HasIO, StringIO, IntRange, BoolType, Writable, EnumType
class SR_IO(StringIO):
end_of_line = b'\x00'
identification = [('ID', r'.*')] # Identification; causes the lock-in amplifier to respond with the number 7270
def communicate(self, cmd): # remove dash from terminator
reply = super().communicate(cmd)
status = self._conn.readbytes(2, 0.1) # get the 2 status bytes
return reply + ';%d;%d' % tuple(status)
class Ametek(StringIO, HasIO):
ioClass = SR_IO
def comm(self, cmd):
reply, status, overload = self.communicate(cmd).split(b';')
if overload != b'0':
self.status = (self.Status.WARN, f'overload {overload}')
self.status = (self.Status.IDLE, '')
return reply
class XY(Ametek, Readable):
value = Parameter('X, Y', datatype=TupleOf(FloatRange(unit='V'), FloatRange(unit='V')))
vmode = Parameter('control mode', EnumType(both_grounded=0, A=1, B=2, A_B_diff=3), readonly=False)
range = Parameter('sensitivity value', FloatRange(0.00, 1), unit='V', default=1)
autosen_on = Parameter('is auto sensitivity on', BoolType(), readonly=False)
noise_control = Parameter('noise control mode', BoolType(), readonly=False)
phase = Parameter('reference phase control', FloatRange(-360, 360), unit='deg', readonly=False)
sen_range = {name: value + 1 for value, name in enumerate(
['2nV', '5nV', '10nV', '20nV', '50nV', '100nV', '200nV', '500nV', '1uV',
'2uV', '5uV', '10uV', '20uV', '50uV', '100uV', '200uV', '500uV', '1mV',
'2mV', '5mV', '10mV', '20mV', '50mV', '100mV', '200mV', '500mV', '1V']
)}
irange = Parameter('sensitivity index', EnumType('sensitivity index range', sen_range), readonly=False)
time_const = {value: name for value, name in enumerate(
[('10us', 'N/A'), ('20us', 'N/A'), ('50us', 'N/A'), ('100us', 'N/A'),
('200us', 'N/A'), ('500us', '500us'), ('1ms', '1ms'), ('2ms', '2ms'),
('5ms', '5ms'), ('10ms', '10ms'), ('20ms', 'N/A'), ('50ms', 'N/A'),
('100ms', 'N/A'), ('200ms', 'N/A'), ('500ms', 'N/A'), ('1s', 'N/A'),
('2s', 'N/A'), ('5s', 'N/A'), ('10s', 'N/A'), ('20s', 'N/A'), ('50s', 'N/A'),
('100s', 'N/A'), ('200s', 'N/A'), ('500s', 'N/A'), ('1ks', 'N/A'),
('10ks', 'N/A'), ('20ks', 'N/A'), ('50ks', 'N/A'), ('100ks', 'N/A')]
)}
itc = Parameter('time const. index', EnumType('time const. index range', time_const), readonly=False)
def read_vmode(self):
return self.comm('VMODE')
def write_vmode(self, vmode):
self.comm(f'IMODE {0}')
return self.comm(f'VMODE {vmode}')
def read_autosen_on(self):
return self.comm('AUTOMATIC')
def write_autosen_on(self, autosen_on):
return self.comm(f'AUTOMATIC {autosen_on}')
def read_irange(self):
return self.comm('SEN')
def write_irange(self, irange):
self.comm(f'IMODE {0}')
self.comm(f'SEN {irange}')
self.read_range()
return irange
def read_range(self):
return self.comm('SEN.') # range value
def write_range(self):
self.comm(f'IMODE {0}')
curr_value = self.read_range()
new_value = self.value
c_ind = None # closest parameters
c_diff = None
for index, value in self.sen_range.items():
diff = abs(curr_value - value)
if c_diff is None or diff < c_diff:
c_ind = index
c_diff = diff
if abs(curr_value - new_value) < c_diff:
return self.comm(f'SEN {c_ind}')
else:
for index, value in self.sen_range.items():
diff = abs(new_value - value)
if c_diff is None or diff < c_diff:
c_ind = index
c_diff = diff
return self.comm(f'SEN {c_ind}')
def read_noise_control(self):
return self.comm('NOISEMODE')
def write_noise_control(self, noise_control):
return self.comm(f'NOISEMODE {noise_control}')
def read_tc(self):
return self.comm('TC.')
def read_itc(self):
return self.comm(f'TC')
# def write_tc(self, itc):
# if self.noise_control == 0:
# self.itc = self.
def read_value(self):
reply = self.comm('XY.').split(',')
x = float(reply[0])
y = float(reply[1])
return x, y
def write_value(self, value):
return self.comm(f'XY {value}')
class Frequency(XY, Writable):
value = Parameter('oscill. frequen. control', FloatRange(0.001, 250e3), unit='Hz', readonly=False)
target = Parameter('target frequency', FloatRange(0.001, 250e3), unit='Hz', readonly=False)
def read_value(self):
return self.comm('OF.')
def write_target(self,):
target = self.target()
return self.comm(f'OF. {target}')
class Amplitude(XY, Writable):
value = Parameter('oscill. amplit. control', FloatRange(0.00, 5), unit='V_rms', readonly=False)
target = Parameter('target amplit.', FloatRange(0.00, 5), unit='V_rms', readonly=False)
# unify the following
# dac = Parameter('output DAC channel value', datatype=TupleOf(IntRange(1, 4), FloatRange(0.0, 5000, unit='mV')),
# readonly=False, initwrite=True, default=(3,0))
# dac = Parameter('output DAC channel value', FloatRange(-10000, 10000, unit='mV'),
# readonly=False, initwrite=True, default=0)
# oscillator amplitude module
def read_value(self):
return self.comm('OA.')
def write_target(self):
target = self.target()
return self.comm(f'OA. {target}')
# external output DAC
# def read_dac(self):
# # reply = self.comm('DAC %g' % channel) # failed to add the DAC channel you want to control
# reply = self.comm('DAC 3') # stack to channel 3
# return reply
# def write_dac(self, value):
# # self.comm('DAC %g %g' % channel % value)
# self.comm('DAC 3 %g' % value)
# return value
# phase and autophase
def read_phase(self):
reply = self.comm('REFP.')
return reply
def write_phase(self, value):
self.comm(f'REFP {round(1000 * value)}')
self.read_phase()
return value
def aphase(self):
"""auto phase"""
self.read_phase()
return self.comm('AQN')
# class Comp(Ametek, Readable):
# enablePoll = False
# value = Parameter(datatype=FloatRange(unit='V'))
#
#
# class arg(Ametek, Readable):
# enablePoll = False
# value = Parameter(datatype=FloatRange(unit=''))

View File

@ -18,6 +18,60 @@
# Module authors: # Module authors:
# Oksana Shliakhtun <oksana.shliakhtun@psi.ch> # Oksana Shliakhtun <oksana.shliakhtun@psi.ch>
# ***************************************************************************** # *****************************************************************************
""" RUFS Command: Description of Bits
====== ======================================================== ==============================================
Value Description
====== ======================================================== ==============================================
V1
B6: warning, rtd1 (internal temp. sensor) is shorted
B0 --> 1
B7: warning, rtd1 is open
B1 --> 2
V2
B0: error, HTC (high temperature cutout) fault B2 --> 4
B1: error, high RA (refrigeration) temperature fault B3 --> 8
V3 B4 --> 16
B0: warning, low level in the bath
B5 --> 32
B1: warning, low temperature
B6 --> 64
B2: warning, high temperature
B7 --> 128
B3: error, low level in the bath
B4: error, low temperature fault
B5: error, high temperature fault
B6: error, low temperature fixed* fault
B7: error, high temperature fixed** fault
V4
B3: idle, circulator** is running
B5: error, circulator** fault
V5
B0: error, pump speed fault
B1: error, motor overloaded
B2: error, high pressure cutout
B3: idle, maximum cooling
B4: idle, cooling
B5: idle, maximum heating
B6: idle, heating
====== ======================================================== ==============================================
"""
from frappy.core import StringIO, Parameter, Readable, HasIO, \ from frappy.core import StringIO, Parameter, Readable, HasIO, \
Drivable, FloatRange, IDLE, ERROR, WARN, BoolType Drivable, FloatRange, IDLE, ERROR, WARN, BoolType
@ -34,57 +88,64 @@ class SensorA10(HasIO, Readable):
value = Parameter('internal temperature', unit='degC') value = Parameter('internal temperature', unit='degC')
def get_par(self, cmd): def get_par(self, cmd):
"""
All the reading commands starts with 'R', in the source code all the commands are written without 'R' (except
'RUFS').The result of a reading command is a value in the format '20C', without spaces.
:param cmd: any hardware command
:return: 'R'+cmd
"""
new_cmd = 'R' + cmd new_cmd = 'R' + cmd
reply = self.communicate(new_cmd) reply = self.communicate(new_cmd)
if any(unit.isalpha() for unit in reply): if any(unit.isalpha() for unit in reply):
reply = ''.join(unit for unit in reply if not unit.isalpha()) reply = ''.join(unit for unit in reply if not unit.isalpha())
return float(reply) return float(reply)
# def set_par(self, cmd, arg):
# new_cmd = 'S' + cmd.format(arg=arg)
# return self.communicate(new_cmd)
# # return self.get_par(cmd)
def read_value(self): def read_value(self):
"""
Reading internal temperature sensor value.
"""
return self.get_par('T') return self.get_par('T')
def read_status(self): def read_status(self):
result_str = self.communicate('RUFS') result_str = self.communicate('RUFS') # read unit fault status
values_str = result_str.strip().split() values_str = result_str.strip().split()
values_int = [int(val) for val in values_str] values_int = [int(val) for val in values_str]
v1, v2, v3, v4, v5 = values_int[:5] v1, v2, v3, v4, v5 = values_int #[:5]
status_messages = [ status_messages = [
(ERROR, 'high tempr. cutout fault', v2, 0), (ERROR, 'high tempr. cutout fault', 2, 0),
(ERROR, 'high RA tempr. fault', v2, 1), (ERROR, 'high RA tempr. fault', 2, 1),
(ERROR, 'high temperature fixed fault', v3, 7), (ERROR, 'high temperature fixed fault', 3, 7),
(ERROR, 'low temperature fixed fault', v3, 6), (ERROR, 'low temperature fixed fault', 3, 6),
(ERROR, 'high temperature fault', v3, 5), (ERROR, 'high temperature fault', 3, 5),
(ERROR, 'low temperature fault', v3, 4), (ERROR, 'low temperature fault', 3, 4),
(ERROR, 'low level fault', v3, 3), (ERROR, 'low level fault', 3, 3),
(ERROR, 'circulator fault', v4, 5), (ERROR, 'circulator fault', 4, 5),
(ERROR, 'high press. cutout', v5, 2), (ERROR, 'high press. cutout', 5, 2),
(ERROR, 'motor overloaded', v5, 1), (ERROR, 'motor overloaded', 5, 1),
(ERROR, 'pump speed fault', v5, 0), (ERROR, 'pump speed fault', 5, 0),
(WARN, 'open internal sensor', v1, 7), (WARN, 'open internal sensor', 1, 7),
(WARN, 'shorted internal sensor', v1, 6), (WARN, 'shorted internal sensor', 1, 6),
(WARN, 'high temperature warn', v3, 2), (WARN, 'high temperature warn', 3, 2),
(WARN, 'low temperature warn', v3, 1), (WARN, 'low temperature warn', 3, 1),
(WARN, 'low level warn', v3, 0), (WARN, 'low level warn', 3, 0),
(IDLE, 'max. heating', v5, 5), (IDLE, 'max. heating', 5, 5),
(IDLE, 'heating', v5, 6), (IDLE, 'heating', 5, 6),
(IDLE, 'cooling', v5, 4), (IDLE, 'cooling', 5, 4),
(IDLE, 'max cooling', v5, 3), (IDLE, 'max cooling', 5, 3),
(IDLE, '', v4, 3), (IDLE, '', 4, 3),
] ]
for status_type, status_msg, vi,bit in status_messages: for status_type, status_msg, vi, bit in status_messages:
if vi & (1 << bit): if values_int[vi-1] & (1 << bit):
print(status_type, status_msg, vi, bit)
return status_type, status_msg return status_type, status_msg
return WARN, 'circulation off' return WARN, 'circulation off'
class TemperatureLoopA10(HasConvergence, SensorA10, Drivable): class TemperatureLoopA10(SensorA10, Drivable):
value = Parameter('temperature', unit='degC') value = Parameter('temperature', unit='degC')
target = Parameter('setpoint/target', datatype=FloatRange, unit='degC', default=0) target = Parameter('setpoint/target', datatype=FloatRange, unit='degC', default=0)
circ_on = Parameter('is circulation running', BoolType(), readonly=False, default=False) circ_on = Parameter('is circulation running', BoolType(), readonly=False, default=False)
@ -108,9 +169,12 @@ class TemperatureLoopA10(HasConvergence, SensorA10, Drivable):
return self.get_par('S') return self.get_par('S')
def write_target(self, target): def write_target(self, target):
"""
:param target: here, it serves as an equivalent to a setpoint.
"""
self.write_circ_on('1') self.write_circ_on('1')
self.communicate(f'SS {target}') self.communicate(f'SS {target}')
self.start_state() # self.start_state()
return target return target
## heat PID ## heat PID