From db94def69451717aaecdda530d9d69ba90cbf232 Mon Sep 17 00:00:00 2001 From: l_samenv Date: Wed, 17 May 2023 16:56:49 +0200 Subject: [PATCH] newest version of okasanas drivers --- doc/source/frappy_psi.rst | 7 ++ frappy_psi/SR.py | 209 +++++++++++++++++++++++++++++++++++++ frappy_psi/thermofisher.py | 130 +++++++++++++++++------ 3 files changed, 313 insertions(+), 33 deletions(-) create mode 100644 frappy_psi/SR.py diff --git a/doc/source/frappy_psi.rst b/doc/source/frappy_psi.rst index db5cb3c..12e21d4 100644 --- a/doc/source/frappy_psi.rst +++ b/doc/source/frappy_psi.rst @@ -25,3 +25,10 @@ Calibrated sensors and control loop not yet supported. :show-inheritance: :members: +Bath Thermostat Thermofisher +............................ + +.. automodule:: frappy_psi.thermofisher + :show-inheritance: + :members: + diff --git a/frappy_psi/SR.py b/frappy_psi/SR.py new file mode 100644 index 0000000..c5f5906 --- /dev/null +++ b/frappy_psi/SR.py @@ -0,0 +1,209 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# ***************************************************************************** +# This program is free software; you can redistribute it and/or modify it under +# the terms of the GNU General Public License as published by the Free Software +# Foundation; either version 2 of the License, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but WITHOUT +# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +# FOR A PARTICULAR PURPOSE. See the GNU General Public License for more +# details. +# +# You should have received a copy of the GNU General Public License along with +# this program; if not, write to the Free Software Foundation, Inc., +# 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA +# +# Module authors: +# Daniel Margineda , Oksana Shliakhtun +# ***************************************************************************** +"""Signal Recovery SR7270: lockin amplifier for AC susceptibility""" + +from frappy.core import Readable, Parameter, FloatRange, TupleOf, \ + HasIO, StringIO, IntRange, BoolType, Writable, EnumType + + +class SR_IO(StringIO): + end_of_line = b'\x00' + identification = [('ID', r'.*')] # Identification; causes the lock-in amplifier to respond with the number 7270 + + def communicate(self, cmd): # remove dash from terminator + reply = super().communicate(cmd) + status = self._conn.readbytes(2, 0.1) # get the 2 status bytes + return reply + ';%d;%d' % tuple(status) + + +class Ametek(StringIO, HasIO): + ioClass = SR_IO + + def comm(self, cmd): + reply, status, overload = self.communicate(cmd).split(b';') + if overload != b'0': + self.status = (self.Status.WARN, f'overload {overload}') + self.status = (self.Status.IDLE, '') + return reply + + +class XY(Ametek, Readable): + value = Parameter('X, Y', datatype=TupleOf(FloatRange(unit='V'), FloatRange(unit='V'))) + vmode = Parameter('control mode', EnumType(both_grounded=0, A=1, B=2, A_B_diff=3), readonly=False) + range = Parameter('sensitivity value', FloatRange(0.00, 1), unit='V', default=1) + autosen_on = Parameter('is auto sensitivity on', BoolType(), readonly=False) + noise_control = Parameter('noise control mode', BoolType(), readonly=False) + phase = Parameter('reference phase control', FloatRange(-360, 360), unit='deg', readonly=False) + + sen_range = {name: value + 1 for value, name in enumerate( + ['2nV', '5nV', '10nV', '20nV', '50nV', '100nV', '200nV', '500nV', '1uV', + '2uV', '5uV', '10uV', '20uV', '50uV', '100uV', '200uV', '500uV', '1mV', + '2mV', '5mV', '10mV', '20mV', '50mV', '100mV', '200mV', '500mV', '1V'] + )} + irange = Parameter('sensitivity index', EnumType('sensitivity index range', sen_range), readonly=False) + + time_const = {value: name for value, name in enumerate( + [('10us', 'N/A'), ('20us', 'N/A'), ('50us', 'N/A'), ('100us', 'N/A'), + ('200us', 'N/A'), ('500us', '500us'), ('1ms', '1ms'), ('2ms', '2ms'), + ('5ms', '5ms'), ('10ms', '10ms'), ('20ms', 'N/A'), ('50ms', 'N/A'), + ('100ms', 'N/A'), ('200ms', 'N/A'), ('500ms', 'N/A'), ('1s', 'N/A'), + ('2s', 'N/A'), ('5s', 'N/A'), ('10s', 'N/A'), ('20s', 'N/A'), ('50s', 'N/A'), + ('100s', 'N/A'), ('200s', 'N/A'), ('500s', 'N/A'), ('1ks', 'N/A'), + ('10ks', 'N/A'), ('20ks', 'N/A'), ('50ks', 'N/A'), ('100ks', 'N/A')] + )} + itc = Parameter('time const. index', EnumType('time const. index range', time_const), readonly=False) + + def read_vmode(self): + return self.comm('VMODE') + + def write_vmode(self, vmode): + self.comm(f'IMODE {0}') + return self.comm(f'VMODE {vmode}') + + def read_autosen_on(self): + return self.comm('AUTOMATIC') + + def write_autosen_on(self, autosen_on): + return self.comm(f'AUTOMATIC {autosen_on}') + + def read_irange(self): + return self.comm('SEN') + + def write_irange(self, irange): + self.comm(f'IMODE {0}') + self.comm(f'SEN {irange}') + self.read_range() + return irange + + def read_range(self): + return self.comm('SEN.') # range value + + def write_range(self): + self.comm(f'IMODE {0}') + curr_value = self.read_range() + new_value = self.value + c_ind = None # closest parameters + c_diff = None + for index, value in self.sen_range.items(): + diff = abs(curr_value - value) + if c_diff is None or diff < c_diff: + c_ind = index + c_diff = diff + if abs(curr_value - new_value) < c_diff: + return self.comm(f'SEN {c_ind}') + else: + for index, value in self.sen_range.items(): + diff = abs(new_value - value) + if c_diff is None or diff < c_diff: + c_ind = index + c_diff = diff + return self.comm(f'SEN {c_ind}') + + def read_noise_control(self): + return self.comm('NOISEMODE') + + def write_noise_control(self, noise_control): + return self.comm(f'NOISEMODE {noise_control}') + + def read_tc(self): + return self.comm('TC.') + + def read_itc(self): + return self.comm(f'TC') + + # def write_tc(self, itc): + # if self.noise_control == 0: + # self.itc = self. + + def read_value(self): + reply = self.comm('XY.').split(',') + x = float(reply[0]) + y = float(reply[1]) + return x, y + + def write_value(self, value): + return self.comm(f'XY {value}') + + +class Frequency(XY, Writable): + value = Parameter('oscill. frequen. control', FloatRange(0.001, 250e3), unit='Hz', readonly=False) + target = Parameter('target frequency', FloatRange(0.001, 250e3), unit='Hz', readonly=False) + + def read_value(self): + return self.comm('OF.') + + def write_target(self,): + target = self.target() + return self.comm(f'OF. {target}') + + +class Amplitude(XY, Writable): + value = Parameter('oscill. amplit. control', FloatRange(0.00, 5), unit='V_rms', readonly=False) + target = Parameter('target amplit.', FloatRange(0.00, 5), unit='V_rms', readonly=False) + + # unify the following + # dac = Parameter('output DAC channel value', datatype=TupleOf(IntRange(1, 4), FloatRange(0.0, 5000, unit='mV')), + # readonly=False, initwrite=True, default=(3,0)) + # dac = Parameter('output DAC channel value', FloatRange(-10000, 10000, unit='mV'), + # readonly=False, initwrite=True, default=0) + # oscillator amplitude module + + def read_value(self): + return self.comm('OA.') + + def write_target(self): + target = self.target() + return self.comm(f'OA. {target}') + + # external output DAC + # def read_dac(self): + # # reply = self.comm('DAC %g' % channel) # failed to add the DAC channel you want to control + # reply = self.comm('DAC 3') # stack to channel 3 + # return reply + + # def write_dac(self, value): + # # self.comm('DAC %g %g' % channel % value) + # self.comm('DAC 3 %g' % value) + # return value + + # phase and autophase + def read_phase(self): + reply = self.comm('REFP.') + return reply + + def write_phase(self, value): + self.comm(f'REFP {round(1000 * value)}') + self.read_phase() + return value + + def aphase(self): + """auto phase""" + self.read_phase() + return self.comm('AQN') + +# class Comp(Ametek, Readable): +# enablePoll = False +# value = Parameter(datatype=FloatRange(unit='V')) +# +# +# class arg(Ametek, Readable): +# enablePoll = False +# value = Parameter(datatype=FloatRange(unit='')) diff --git a/frappy_psi/thermofisher.py b/frappy_psi/thermofisher.py index 6f2ff10..18447fd 100644 --- a/frappy_psi/thermofisher.py +++ b/frappy_psi/thermofisher.py @@ -18,6 +18,60 @@ # Module authors: # Oksana Shliakhtun # ***************************************************************************** +""" RUFS Command: Description of Bits + + ====== ======================================================== ============================================== + Value Description + ====== ======================================================== ============================================== + V1 + B6: warning, rtd1 (internal temp. sensor) is shorted + B0 --> 1 + B7: warning, rtd1 is open + B1 --> 2 + V2 + B0: error, HTC (high temperature cutout) fault B2 --> 4 + + B1: error, high RA (refrigeration) temperature fault B3 --> 8 + + V3 B4 --> 16 + B0: warning, low level in the bath + B5 --> 32 + B1: warning, low temperature + B6 --> 64 + B2: warning, high temperature + B7 --> 128 + B3: error, low level in the bath + + B4: error, low temperature fault + + B5: error, high temperature fault + + B6: error, low temperature fixed* fault + + B7: error, high temperature fixed** fault + + V4 + B3: idle, circulator** is running + + B5: error, circulator** fault + + V5 + B0: error, pump speed fault + + B1: error, motor overloaded + + B2: error, high pressure cutout + + B3: idle, maximum cooling + + B4: idle, cooling + + B5: idle, maximum heating + + B6: idle, heating + ====== ======================================================== ============================================== + +""" from frappy.core import StringIO, Parameter, Readable, HasIO, \ Drivable, FloatRange, IDLE, ERROR, WARN, BoolType @@ -34,57 +88,64 @@ class SensorA10(HasIO, Readable): value = Parameter('internal temperature', unit='degC') def get_par(self, cmd): + """ + All the reading commands starts with 'R', in the source code all the commands are written without 'R' (except + 'RUFS').The result of a reading command is a value in the format '20C', without spaces. + + :param cmd: any hardware command + + :return: 'R'+cmd + """ new_cmd = 'R' + cmd reply = self.communicate(new_cmd) if any(unit.isalpha() for unit in reply): reply = ''.join(unit for unit in reply if not unit.isalpha()) return float(reply) - # def set_par(self, cmd, arg): - # new_cmd = 'S' + cmd.format(arg=arg) - # return self.communicate(new_cmd) - # # return self.get_par(cmd) - def read_value(self): + """ + Reading internal temperature sensor value. + """ return self.get_par('T') def read_status(self): - result_str = self.communicate('RUFS') + result_str = self.communicate('RUFS') # read unit fault status values_str = result_str.strip().split() values_int = [int(val) for val in values_str] - v1, v2, v3, v4, v5 = values_int[:5] + v1, v2, v3, v4, v5 = values_int #[:5] status_messages = [ - (ERROR, 'high tempr. cutout fault', v2, 0), - (ERROR, 'high RA tempr. fault', v2, 1), - (ERROR, 'high temperature fixed fault', v3, 7), - (ERROR, 'low temperature fixed fault', v3, 6), - (ERROR, 'high temperature fault', v3, 5), - (ERROR, 'low temperature fault', v3, 4), - (ERROR, 'low level fault', v3, 3), - (ERROR, 'circulator fault', v4, 5), - (ERROR, 'high press. cutout', v5, 2), - (ERROR, 'motor overloaded', v5, 1), - (ERROR, 'pump speed fault', v5, 0), - (WARN, 'open internal sensor', v1, 7), - (WARN, 'shorted internal sensor', v1, 6), - (WARN, 'high temperature warn', v3, 2), - (WARN, 'low temperature warn', v3, 1), - (WARN, 'low level warn', v3, 0), - (IDLE, 'max. heating', v5, 5), - (IDLE, 'heating', v5, 6), - (IDLE, 'cooling', v5, 4), - (IDLE, 'max cooling', v5, 3), - (IDLE, '', v4, 3), + (ERROR, 'high tempr. cutout fault', 2, 0), + (ERROR, 'high RA tempr. fault', 2, 1), + (ERROR, 'high temperature fixed fault', 3, 7), + (ERROR, 'low temperature fixed fault', 3, 6), + (ERROR, 'high temperature fault', 3, 5), + (ERROR, 'low temperature fault', 3, 4), + (ERROR, 'low level fault', 3, 3), + (ERROR, 'circulator fault', 4, 5), + (ERROR, 'high press. cutout', 5, 2), + (ERROR, 'motor overloaded', 5, 1), + (ERROR, 'pump speed fault', 5, 0), + (WARN, 'open internal sensor', 1, 7), + (WARN, 'shorted internal sensor', 1, 6), + (WARN, 'high temperature warn', 3, 2), + (WARN, 'low temperature warn', 3, 1), + (WARN, 'low level warn', 3, 0), + (IDLE, 'max. heating', 5, 5), + (IDLE, 'heating', 5, 6), + (IDLE, 'cooling', 5, 4), + (IDLE, 'max cooling', 5, 3), + (IDLE, '', 4, 3), ] - for status_type, status_msg, vi,bit in status_messages: - if vi & (1 << bit): + for status_type, status_msg, vi, bit in status_messages: + if values_int[vi-1] & (1 << bit): + print(status_type, status_msg, vi, bit) return status_type, status_msg - return WARN, 'circulation off' + return WARN, 'circulation off' -class TemperatureLoopA10(HasConvergence, SensorA10, Drivable): +class TemperatureLoopA10(SensorA10, Drivable): value = Parameter('temperature', unit='degC') target = Parameter('setpoint/target', datatype=FloatRange, unit='degC', default=0) circ_on = Parameter('is circulation running', BoolType(), readonly=False, default=False) @@ -108,9 +169,12 @@ class TemperatureLoopA10(HasConvergence, SensorA10, Drivable): return self.get_par('S') def write_target(self, target): + """ + :param target: here, it serves as an equivalent to a setpoint. + """ self.write_circ_on('1') self.communicate(f'SS {target}') - self.start_state() + # self.start_state() return target ## heat PID